[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19222 **[Test build #88619 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88619/testReport)** for PR 19222 at commit [`c53b6b8`](https://github.com/apache/spark/commit/c53b6b839be6a69e9f05d2be258d5342bdeb4632). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19222 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1776/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19222 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20844: [SPARK-23707][SQL] Don't need shuffle exchange wi...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/20844#discussion_r177311524 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala --- @@ -39,7 +39,7 @@ class ConfigBehaviorSuite extends QueryTest with SharedSQLContext { def computeChiSquareTest(): Double = { val n = 1 // Trigger a sort - val data = spark.range(0, n, 1, 1).sort('id) + val data = spark.range(0, n, 1, 2).sort('id) --- End diff -- Why change this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177306568 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java --- @@ -69,8 +64,9 @@ public long size() { * Fill this all with 0L. */ public void zeroOut() { +long baseOffset = memory.getBaseOffset(); for (long off = baseOffset; off < baseOffset + length * WIDTH; off += WIDTH) { --- End diff -- Good catch. These three unchanges seems to lead to failures. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20906: [SPARK-23561][SS] Pull continuous processing out of Writ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20906 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20906: [SPARK-23561][SS] Pull continuous processing out of Writ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20906 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88611/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20906: [SPARK-23561][SS] Pull continuous processing out of Writ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20906 **[Test build #88611 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88611/testReport)** for PR 20906 at commit [`26c1ead`](https://github.com/apache/spark/commit/26c1eadc67ffc48bcaf877154660982455892389). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20756: [SPARK-23593][SQL] Add interpreted execution for ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20756#discussion_r177302845 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala --- @@ -68,6 +68,32 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { mapEncoder.serializer.head, mapExpected, mapInputRow) } + test("SPARK-23593: InitializeJavaBean should support interpreted execution") { +val list = new java.util.LinkedList[Int]() +list.add(1) + +val initializeBean = InitializeJavaBean(Literal.fromObject(new java.util.LinkedList[Int]), + Map("add" -> Literal(1))) +checkEvaluation(initializeBean, list, InternalRow.fromSeq(Seq())) + +val initializeWithNonexistingMethod = InitializeJavaBean( + Literal.fromObject(new java.util.LinkedList[Int]), + Map("nonexisting" -> Literal(1))) +checkExceptionInExpression[Exception](initializeWithNonexistingMethod, + InternalRow.fromSeq(Seq()), + """A method named "nonexisting" is not declared in any enclosing class """ + +"nor any supertype") + +val initializeWithWrongParamType = InitializeJavaBean( + Literal.fromObject(new TestBean), + Map("setX" -> Literal("1"))) +intercept[Exception] { + evaluateWithoutCodegen(initializeWithWrongParamType, InternalRow.fromSeq(Seq())) +}.getMessage.contains( --- End diff -- For codegen the compile exception is like: ``` No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void org.apache.spark.sql.catalyst.expressions.TestBean.setX(int)" ``` I'm not sure if we want to exactly match this kind of exception message from interpreted execution. Might be a little overkill to do that by looking methods with same name. So currently I only test interpreted execution. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20756: [SPARK-23593][SQL] Add interpreted execution for Initial...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20756 **[Test build #88618 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88618/testReport)** for PR 20756 at commit [`e7640e1`](https://github.com/apache/spark/commit/e7640e14ac8eea8ba5521d80f640664e5753bb9f). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20756: [SPARK-23593][SQL] Add interpreted execution for Initial...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20756 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20756: [SPARK-23593][SQL] Add interpreted execution for Initial...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20756 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1775/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20906: [SPARK-23561][SS] Pull continuous processing out of Writ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20906 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20906: [SPARK-23561][SS] Pull continuous processing out of Writ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20906 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88614/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20756: [SPARK-23593][SQL] Add interpreted execution for ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20756#discussion_r177302386 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala --- @@ -68,6 +68,23 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { mapEncoder.serializer.head, mapExpected, mapInputRow) } + test("SPARK-23593: InitializeJavaBean should support interpreted execution") { +val list = new java.util.LinkedList[Int]() +list.add(1) + +val initializeBean = InitializeJavaBean(Literal.fromObject(new java.util.LinkedList[Int]), + Map("add" -> Literal(1))) +checkEvaluation(initializeBean, list, InternalRow.fromSeq(Seq())) + +val initializeWithNonexistingMethod = InitializeJavaBean( + Literal.fromObject(new java.util.LinkedList[Int]), --- End diff -- Added below. Note that because for generic method, its parameter type is `Object`, so for `LinkedList[Int]`, it doesn't make sense to test it with something like `Map("add" -> Literal("a string"))` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20906: [SPARK-23561][SS] Pull continuous processing out of Writ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20906 **[Test build #88614 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88614/testReport)** for PR 20906 at commit [`4a7a4cc`](https://github.com/apache/spark/commit/4a7a4cc4aef2e27025e019d8dde29c30026cb330). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20784: [SPARK-23639][SQL]Obtain token before init metastore cli...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20784 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1774/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20784: [SPARK-23639][SQL]Obtain token before init metastore cli...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20784 **[Test build #88617 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88617/testReport)** for PR 20784 at commit [`1e38840`](https://github.com/apache/spark/commit/1e38840c6611373430c8c6bfc50ef2f09beeaf2d). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20784: [SPARK-23639][SQL]Obtain token before init metastore cli...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20784 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20753: [SPARK-23582][SQL] StaticInvoke should support interpret...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20753 @hvanhovell sorry for ping again --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20850: [SPARK-23713][SQL] Cleanup UnsafeWriter and BufferHolder...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20850 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20850: [SPARK-23713][SQL] Cleanup UnsafeWriter and BufferHolder...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20850 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1773/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20850: [SPARK-23713][SQL] Cleanup UnsafeWriter and BufferHolder...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20850 **[Test build #88616 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88616/testReport)** for PR 20850 at commit [`3637a5c`](https://github.com/apache/spark/commit/3637a5c171ab856051b64bdd3fe01d40c5b2b569). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20850: [SPARK-23713][SQL] Cleanup UnsafeWriter and BufferHolder...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20850 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20797: [SPARK-23583][SQL] Invoke should support interpreted exe...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20797 @hvanhovell sorry for ping again --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177296492 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java --- @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.memory; + +import com.google.common.primitives.Ints; + +import org.apache.spark.unsafe.Platform; + +/** + * A consecutive block of memory with a long array on Java heap. + */ +public final class OnHeapMemoryBlock extends MemoryBlock { + + private final long[] array; + + public OnHeapMemoryBlock(long[] obj, long offset, long size) { +super(obj, offset, size); +this.array = obj; +assert(offset + size <= obj.length * 8L + Platform.LONG_ARRAY_OFFSET) : + "The sum of size " + size + " and offset " + offset + " should not be larger than " + +"the array size " + (obj.length * 8L + Platform.LONG_ARRAY_OFFSET); + } + + public OnHeapMemoryBlock(long size) { +this(new long[Ints.checkedCast((size + 7) / 8)], Platform.LONG_ARRAY_OFFSET, size); + } + + @Override + public MemoryBlock subBlock(long offset, long size) { +checkSubBlockRange(offset, size); +return new OnHeapMemoryBlock(array, this.offset + offset, size); + } + + public long[] getLongArray() { return array; } + + /** + * Creates a memory block pointing to the memory used by the long array. + */ + public static OnHeapMemoryBlock fromArray(final long[] array) { +return new OnHeapMemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + } + + public static OnHeapMemoryBlock fromArray(final long[] array, long size) { +return new OnHeapMemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size); + } + + @Override + public final int getInt(long offset) { +return Platform.getInt(array, this.offset + offset); + } + + @Override + public final void putInt(long offset, int value) { +Platform.putInt(array, this.offset + offset, value); + } + + @Override + public final boolean getBoolean(long offset) { +return Platform.getBoolean(array, this.offset + offset); + } + + @Override + public final void putBoolean(long offset, boolean value) { +Platform.putBoolean(array, this.offset + offset, value); + } + + @Override + public final byte getByte(long offset) { +return Platform.getByte(array, this.offset + offset); + } + + @Override + public final void putByte(long offset, byte value) { +Platform.putByte(array, this.offset + offset, value); + } + + @Override + public final short getShort(long offset) { +return Platform.getShort(array, this.offset + offset); + } + + @Override + public final void putShort(long offset, short value) { +Platform.putShort(array, this.offset + offset, value); + } + + @Override + public final long getLong(long offset) { +return Platform.getLong(array, this.offset + offset); --- End diff -- shall we also apply https://github.com/apache/spark/pull/19222/files#diff-b9576c68b154d5e554671ffa84bfa74eR80 here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20908: [SPARK-23672][PYTHON] Document support for nested...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20908#discussion_r177296271 --- Diff: python/pyspark/sql/tests.py --- @@ -3966,6 +3967,15 @@ def random_udf(v): random_udf = random_udf.asNondeterministic() return random_udf +def test_pandas_udf_tokenize(self): +from pyspark.sql.functions import pandas_udf +tokenize = pandas_udf(lambda s: s.apply(lambda str: str.split(' ')), + ArrayType(StringType())) +self.assertEqual(udf.returnType, ArrayType(StringType())) +df = self.spark.createDataFrame([("hi boo",), ("bye boo",)], ["vals"]) +result = df.select(tokenize("vals").alias("hi")) +self.assertEqual([], result.collect()) --- End diff -- Am I missing something? Is it equal to `[]`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177296342 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +45,161 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; - public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); + protected MemoryBlock(@Nullable Object obj, long offset, long length) { +if (offset < 0 || length < 0) { + throw new ArrayIndexOutOfBoundsException( +"Length " + length + " and offset " + offset + "must be non-negative"); +} +this.obj = obj; +this.offset = offset; this.length = length; } + protected MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException( +"Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Just instantiate the same type of MemoryBlock with new offset and size. The data is not + * copied. If parameters are invalid, an exception is thrown + */ + public abstract MemoryBlock subBlock(long offset, long size); + + protected void checkSubBlockRange(long offset, long size) { +if (this.offset + offset < 0 || size < 0) { + throw new ArrayIndexOutOfBoundsException( +"Size " + size + " and offset " + (this.offset + offset) + " must be non-negative"); +} +if (offset + size > length) { + throw new ArrayIndexOutOfBoundsException("The sum of size " + size + " and offset " + +offset + " should not be larger than the length " + length + " in the MemoryBlock"); +} + } + + /** + * getXXX/putXXX does not ensure guarantee behavior if the offset is invalid. e.g cause illegal + * memory access, throw an exception, or etc. + * getXXX/putXXX uses an index based on this.offset that includes the size of metadata such as + * JVM object header. Thus, the offset is expected as an logical offset in the memory block. --- End diff -- also mention that the offset is 0-based. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177296162 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +45,161 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; - public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); + protected MemoryBlock(@Nullable Object obj, long offset, long length) { +if (offset < 0 || length < 0) { + throw new ArrayIndexOutOfBoundsException( +"Length " + length + " and offset " + offset + "must be non-negative"); +} +this.obj = obj; +this.offset = offset; this.length = length; } + protected MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException( +"Instantiate MemoryBlock for type " + obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Just instantiate the same type of MemoryBlock with new offset and size. The data is not + * copied. If parameters are invalid, an exception is thrown + */ + public abstract MemoryBlock subBlock(long offset, long size); + + protected void checkSubBlockRange(long offset, long size) { +if (this.offset + offset < 0 || size < 0) { --- End diff -- the check should be `offset >= 0`, the sub-block must be real "sub". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177295079 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java --- @@ -89,6 +85,6 @@ public void set(int index, long value) { public long get(int index) { assert index >= 0 : "index (" + index + ") should >= 0"; assert index < length : "index (" + index + ") should < length (" + length + ")"; -return Platform.getLong(baseObj, baseOffset + index * WIDTH); +return memory.getLong(memory.getBaseOffset() + index * WIDTH); --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177295038 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java --- @@ -80,7 +76,7 @@ public void zeroOut() { public void set(int index, long value) { assert index >= 0 : "index (" + index + ") should >= 0"; assert index < length : "index (" + index + ") should < length (" + length + ")"; -Platform.putLong(baseObj, baseOffset + index * WIDTH, value); --- End diff -- update it to use 0-based offset. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r177294950 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/array/LongArray.java --- @@ -69,8 +64,9 @@ public long size() { * Fill this all with 0L. */ public void zeroOut() { +long baseOffset = memory.getBaseOffset(); for (long off = baseOffset; off < baseOffset + length * WIDTH; off += WIDTH) { --- End diff -- the `off` should starts with 0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20908: [SPARK-23672][PYTHON] Document support for nested...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20908#discussion_r177294401 --- Diff: python/pyspark/sql/tests.py --- @@ -3966,6 +3967,15 @@ def random_udf(v): random_udf = random_udf.asNondeterministic() return random_udf +def test_pandas_udf_tokenize(self): +from pyspark.sql.functions import pandas_udf +tokenize = pandas_udf(lambda s: s.apply(lambda str: str.split(' ')), --- End diff -- hm. I thought this PR targets to clarify array type wuth primitive types. can we improve the test case here -https://github.com/holdenk/spark/blob/342d2228a5c68fd2c07bd8c1b518da6135ce1bf6/python/pyspark/sql/tests.py#L3998, and remove this test case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/20910#discussion_r177293958 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import io.fabric8.kubernetes.api.model.Pod + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, MainAppResource} +import org.apache.spark.internal.config.ConfigEntry + +private[spark] sealed trait KubernetesRoleSpecificConf + +private[spark] case class KubernetesDriverSpecificConf( + mainAppResource: Option[MainAppResource], + mainClass: String, + appName: String, + appArgs: Seq[String]) extends KubernetesRoleSpecificConf + +private[spark] case class KubernetesExecutorSpecificConf( + executorId: String, driverPod: Pod) + extends KubernetesRoleSpecificConf + +private[spark] class KubernetesConf[T <: KubernetesRoleSpecificConf]( + val sparkConf: SparkConf, + val roleSpecificConf: T, + val appResourceNamePrefix: String, + val appId: String, + val roleLabels: Map[String, String], + val roleAnnotations: Map[String, String], + val roleSecretNamesToMountPaths: Map[String, String], + val roleEnvs: Map[String, String]) { + + def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE) + + def sparkJars(): Seq[String] = sparkConf +.getOption("spark.jars") +.map(str => str.split(",").toSeq) +.getOrElse(Seq.empty[String]) + + def sparkFiles(): Seq[String] = sparkConf +.getOption("spark.files") +.map(str => str.split(",").toSeq) +.getOrElse(Seq.empty[String]) + + def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY) + + def nodeSelector(): Map[String, String] = +KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX) + + def get[T](config: ConfigEntry[T]): T = sparkConf.get(config) + + def get(conf: String, defaultValue: String): String = sparkConf.get(conf, defaultValue) + + def getOption(key: String): Option[String] = sparkConf.getOption(key) + +} + + +private[spark] object KubernetesConf { + def createDriverConf( +sparkConf: SparkConf, +appName: String, +appResourceNamePrefix: String, +appId: String, +mainAppResource: Option[MainAppResource], +mainClass: String, +appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = { +val sparkConfWithMainAppJar = sparkConf.clone() +mainAppResource.foreach { + case JavaMainAppResource(res) => +val previousJars = sparkConf + .getOption("spark.jars") + .map(_.split(",")) + .getOrElse(Array.empty) +if (!previousJars.contains(res)) { + sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) +} +} + +val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_DRIVER_LABEL_PREFIX) +require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " + + s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " + + "operations.") +require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with key " + + s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark bookkeeping " + + "operations.") +val driverLabels = driverCustomLabels ++ Map( + SPARK_APP_ID_LABEL -> appId, + SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) +val driverAnnotations = + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX) +
[GitHub] spark pull request #20908: [SPARK-23672][PYTHON] Document support for nested...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20908#discussion_r177293289 --- Diff: python/pyspark/sql/tests.py --- @@ -3806,6 +3806,7 @@ def test_timestamp_dst(self): not _have_pandas or not _have_pyarrow, _pandas_requirement_message or _pyarrow_requirement_message) class PandasUDFTests(ReusedSQLTestCase): + --- End diff -- seems unrelated .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20784: [SPARK-23639][SQL]Obtain token before init metast...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20784#discussion_r177291761 --- Diff: sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala --- @@ -77,6 +80,12 @@ private[hive] object SparkSQLCLIDriver extends Logging { }) } + private def isSecuredAndProxy(hiveConf: HiveConf): Boolean = { --- End diff -- Isn't this basically `HiveDelegationTokenProvider.delegationTokensRequired`? Doesn't it work if you just call that method? The only difference is the check for deploy mode, which should work fine in this context. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20784: [SPARK-23639][SQL]Obtain token before init metastore cli...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20784 cc @jerryshao --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/20910#discussion_r177290859 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import io.fabric8.kubernetes.api.model.Pod + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, MainAppResource} +import org.apache.spark.internal.config.ConfigEntry + +private[spark] sealed trait KubernetesRoleSpecificConf + +private[spark] case class KubernetesDriverSpecificConf( + mainAppResource: Option[MainAppResource], + mainClass: String, + appName: String, + appArgs: Seq[String]) extends KubernetesRoleSpecificConf + +private[spark] case class KubernetesExecutorSpecificConf( + executorId: String, driverPod: Pod) + extends KubernetesRoleSpecificConf + +private[spark] class KubernetesConf[T <: KubernetesRoleSpecificConf]( + val sparkConf: SparkConf, + val roleSpecificConf: T, + val appResourceNamePrefix: String, + val appId: String, + val roleLabels: Map[String, String], + val roleAnnotations: Map[String, String], + val roleSecretNamesToMountPaths: Map[String, String]) { + + def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE) + + def sparkJars(): Seq[String] = sparkConf +.getOption("spark.jars") +.map(str => str.split(",").toSeq) +.getOrElse(Seq.empty[String]) + + def sparkFiles(): Seq[String] = sparkConf +.getOption("spark.files") +.map(str => str.split(",").toSeq) +.getOrElse(Seq.empty[String]) + + def driverCustomEnvs(): Seq[(String, String)] = +sparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq + + def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY) + + def nodeSelector(): Map[String, String] = +KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX) + + def get[T](config: ConfigEntry[T]): T = sparkConf.get(config) + + def get(conf: String, defaultValue: String): String = sparkConf.get(conf, defaultValue) + + def getOption(key: String): Option[String] = sparkConf.getOption(key) + --- End diff -- Oh, I meant removing the extra new line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/20910#discussion_r177290805 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala --- @@ -14,17 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.k8s.submit.steps +package org.apache.spark.deploy.k8s -import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec +import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder} -/** - * Represents a step in configuring the Spark driver pod. - */ -private[spark] trait DriverConfigurationStep { +private[spark] case class SparkPod(pod: Pod, container: Container) - /** - * Apply some transformation to the previous state of the driver to add a new feature to it. - */ - def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec +private[spark] object SparkPod { + def initialPod(): SparkPod = { +SparkPod( + new PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build(), --- End diff -- Got it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20784: [SPARK-23639][SQL]Obtain token before init metastore cli...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20784 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20784: [SPARK-23639][SQL]Obtain token before init metastore cli...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20784 **[Test build #88615 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88615/testReport)** for PR 20784 at commit [`5c4335b`](https://github.com/apache/spark/commit/5c4335b28fc94a406ca52f60b549e3931668de08). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20784: [SPARK-23639][SQL]Obtain token before init metastore cli...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20784 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1772/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20906: [SPARK-23561][SS] Pull continuous processing out of Writ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20906 **[Test build #88614 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88614/testReport)** for PR 20906 at commit [`4a7a4cc`](https://github.com/apache/spark/commit/4a7a4cc4aef2e27025e019d8dde29c30026cb330). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20906: [SPARK-23561][SS] Pull continuous processing out ...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20906#discussion_r177288972 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.util.control.NonFatal + +import org.apache.spark.{SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory} +import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo} +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.util.Utils + +case class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan) --- End diff -- Sure, works for me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20784: [SPARK-23639][SQL]Obtain token before init metastore cli...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20784 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1771/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20784: [SPARK-23639][SQL]Obtain token before init metastore cli...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20784 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20906: [SPARK-23561][SS] Pull continuous processing out of Writ...
Github user jose-torres commented on the issue: https://github.com/apache/spark/pull/20906 All that should be left before removing the inheritance is to introduce a continuous version of the logical plan WriteToDataSourceV2. I wanted to do that in a separate PR, since there's already a lot going on here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20784: [SPARK-23639][SQL]Obtain token before init metastore cli...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20784 **[Test build #88613 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88613/testReport)** for PR 20784 at commit [`ba6b086`](https://github.com/apache/spark/commit/ba6b0860d808f276537da9ee4e8e0800a8f42bbd). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20784: [SPARK-23639][SQL]Obtain token before init metast...
Github user yaooqinn commented on a diff in the pull request: https://github.com/apache/spark/pull/20784#discussion_r177288126 --- Diff: sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala --- @@ -121,6 +134,25 @@ private[hive] object SparkSQLCLIDriver extends Logging { } } +if (isSecuredAndProxy(conf)) { + val currentUser = UserGroupInformation.getCurrentUser + try { +SparkHadoopUtil.get.doAsRealUser { --- End diff -- updated --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20784: [SPARK-23639][SQL]Obtain token before init metastore cli...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20784 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20784: [SPARK-23639][SQL]Obtain token before init metastore cli...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20784 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88612/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20784: [SPARK-23639][SQL]Obtain token before init metastore cli...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20784 **[Test build #88612 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88612/testReport)** for PR 20784 at commit [`4063855`](https://github.com/apache/spark/commit/40638552d0e252b62ac0c6d97376f0d2e66730e6). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20784: [SPARK-23639][SQL]Obtain token before init metastore cli...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20784 **[Test build #88612 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88612/testReport)** for PR 20784 at commit [`4063855`](https://github.com/apache/spark/commit/40638552d0e252b62ac0c6d97376f0d2e66730e6). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20910: [SPARK-22839] [K8s] Refactor to unify driver and executo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20910 Kubernetes integration test status failure URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/1747/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20893: [SPARK-23785][LAUNCHER] LauncherBackend doesn't check st...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20893 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88604/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20893: [SPARK-23785][LAUNCHER] LauncherBackend doesn't check st...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20893 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20893: [SPARK-23785][LAUNCHER] LauncherBackend doesn't check st...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20893 **[Test build #88604 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88604/testReport)** for PR 20893 at commit [`4ca8a32`](https://github.com/apache/spark/commit/4ca8a32e2a518f3c7ccecd406a8b03eac06f860b). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20906: [SPARK-23561][SS] Pull continuous processing out ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20906#discussion_r177244722 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.util.control.NonFatal + +import org.apache.spark.{SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory} +import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo} +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.util.Utils + +case class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan) +extends SparkPlan with Logging { + override def children: Seq[SparkPlan] = Seq(query) + override def output: Seq[Attribute] = Nil + + override protected def doExecute(): RDD[InternalRow] = { +val writerFactory = writer match { + case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory() + case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema) +} + +val rdd = query.execute() +val messages = new Array[WriterCommitMessage](rdd.partitions.length) + +logInfo(s"Start processing data source writer: $writer. " + + s"The input RDD has ${messages.length} partitions.") +EpochCoordinatorRef.get( --- End diff -- nit: Add comment on what this does. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20906: [SPARK-23561][SS] Pull continuous processing out ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20906#discussion_r177245022 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.util.control.NonFatal + +import org.apache.spark.{SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory} +import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo} +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.util.Utils + +case class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan) --- End diff -- Isnt it better to keep the name consistent with WriterToDataSource? Say, WriteToContinuousDataSourceExec? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20906: [SPARK-23561][SS] Pull continuous processing out ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20906#discussion_r177275489 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.util.control.NonFatal + +import org.apache.spark.{SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory} +import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo} +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.util.Utils + +case class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan) +extends SparkPlan with Logging { + override def children: Seq[SparkPlan] = Seq(query) + override def output: Seq[Attribute] = Nil + + override protected def doExecute(): RDD[InternalRow] = { +val writerFactory = writer match { + case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory() + case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema) +} + +val rdd = query.execute() +val messages = new Array[WriterCommitMessage](rdd.partitions.length) + +logInfo(s"Start processing data source writer: $writer. " + + s"The input RDD has ${messages.length} partitions.") +EpochCoordinatorRef.get( + sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), + sparkContext.env) + .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions)) --- End diff -- nit: this indentation looks weird. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20906: [SPARK-23561][SS] Pull continuous processing out ...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20906#discussion_r177243246 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteExec.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.util.control.NonFatal + +import org.apache.spark.{SparkEnv, SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory} +import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo} +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.util.Utils + +case class ContinuousWriteExec(writer: StreamWriter, query: SparkPlan) --- End diff -- add docs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20906: [SPARK-23561][SS] Pull continuous processing out of Writ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20906 **[Test build #88611 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88611/testReport)** for PR 20906 at commit [`26c1ead`](https://github.com/apache/spark/commit/26c1eadc67ffc48bcaf877154660982455892389). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20906: [SPARK-23561][SS] Pull continuous processing out of Writ...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/20906 jenkins, retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20858: [SPARK-23736][SQL] Extending the concat function ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20858#discussion_r177279540 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -287,3 +289,152 @@ case class ArrayContains(left: Expression, right: Expression) override def prettyName: String = "array_contains" } + +/** + * Concatenates multiple arrays into one. + */ +@ExpressionDescription( + usage = "_FUNC_(expr, ...) - Concatenates multiple arrays into one.", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(4, 5), array(6)); + [1,2,3,4,5,6] + """) +case class ConcatArrays(children: Seq[Expression]) extends Expression with NullSafeEvaluation { + + override def checkInputDataTypes(): TypeCheckResult = { +val arrayCheck = checkInputDataTypesAreArrays +if(arrayCheck.isFailure) arrayCheck +else TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), s"function $prettyName") + } + + private def checkInputDataTypesAreArrays(): TypeCheckResult = + { +val mismatches = children.zipWithIndex.collect { + case (child, idx) if !ArrayType.acceptsType(child.dataType) => +s"argument ${idx + 1} has to be ${ArrayType.simpleString} type, " + + s"however, '${child.sql}' is of ${child.dataType.simpleString} type." +} + +if (mismatches.isEmpty) { + TypeCheckResult.TypeCheckSuccess +} else { + TypeCheckResult.TypeCheckFailure(mismatches.mkString(" ")) +} + } + + override def dataType: ArrayType = +children + .headOption.map(_.dataType.asInstanceOf[ArrayType]) + .getOrElse(ArrayType.defaultConcreteType.asInstanceOf[ArrayType]) --- End diff -- Hm .. but then this is `array` when the children are empty. Seems `CreateArray`'s type is `array` in this case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20910: [SPARK-22839] [K8s] Refactor to unify driver and executo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20910 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20910: [SPARK-22839] [K8s] Refactor to unify driver and executo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20910 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88610/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20910: [SPARK-22839] [K8s] Refactor to unify driver and executo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20910 **[Test build #88610 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88610/testReport)** for PR 20910 at commit [`f3540f8`](https://github.com/apache/spark/commit/f3540f8a9032ff1ffa8d98f764a2f16b5ac289f2). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20858: [SPARK-23736][SQL] Extending the concat function ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20858#discussion_r177278880 --- Diff: python/pyspark/sql/functions.py --- @@ -1834,6 +1819,25 @@ def array_contains(col, value): return Column(sc._jvm.functions.array_contains(_to_java_column(col), value)) +@since(1.5) +@ignore_unicode_prefix +def concat(*cols): +""" +Concatenates multiple input columns together into a single column. +The function works with strings, binary columns and arrays of the same time. + +>>> df = spark.createDataFrame([('abcd','123')], ['s', 'd']) +>>> df.select(concat(df.s, df.d).alias('s')).collect() +[Row(s=u'abcd123')] + +>>> df = spark.createDataFrame([([1, 2], [3, 4], [5]), ([1, 2], None, [3])], ['a', 'b', 'c']) +>>> df.select(concat(df.a, df.b, df.c).alias("arr")).collect() +[Row(arr=[1, 2, 3, 4, 5]), Row(arr=None)] +""" +sc = SparkContext._active_spark_context +return Column(sc._jvm.functions.concat(_to_seq(sc, cols, _to_java_column))) --- End diff -- Why did we move this down .. ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20910: [SPARK-22839] [K8s] Refactor to unify driver and executo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20910 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1770/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20910: [SPARK-22839] [K8s] Refactor to unify driver and executo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20910 Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/1747/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20910: [SPARK-22839] [K8s] Refactor to unify driver and executo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20910 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20858: [SPARK-23736][SQL] Extending the concat function ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20858#discussion_r177278671 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -699,3 +699,88 @@ abstract class TernaryExpression extends Expression { * and Hive function wrappers. */ trait UserDefinedExpression + +/** + * The trait covers logic for performing null save evaluation and code generation. + */ +trait NullSafeEvaluation extends Expression +{ + override def foldable: Boolean = children.forall(_.foldable) + + override def nullable: Boolean = children.exists(_.nullable) + + /** + * Default behavior of evaluation according to the default nullability of NullSafeEvaluation. + * If a class utilizing NullSaveEvaluation override [[nullable]], probably should also + * override this. + */ + override def eval(input: InternalRow): Any = + { --- End diff -- Seems the style fix is missed here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/20910#discussion_r177277920 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import io.fabric8.kubernetes.api.model.Pod + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, MainAppResource} +import org.apache.spark.internal.config.ConfigEntry + +private[spark] sealed trait KubernetesRoleSpecificConf + +private[spark] case class KubernetesDriverSpecificConf( + mainAppResource: Option[MainAppResource], + mainClass: String, + appName: String, + appArgs: Seq[String]) extends KubernetesRoleSpecificConf + +private[spark] case class KubernetesExecutorSpecificConf( + executorId: String, driverPod: Pod) + extends KubernetesRoleSpecificConf + +private[spark] class KubernetesConf[T <: KubernetesRoleSpecificConf]( + val sparkConf: SparkConf, + val roleSpecificConf: T, + val appResourceNamePrefix: String, + val appId: String, + val roleLabels: Map[String, String], + val roleAnnotations: Map[String, String], + val roleSecretNamesToMountPaths: Map[String, String]) { + + def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE) + + def sparkJars(): Seq[String] = sparkConf +.getOption("spark.jars") +.map(str => str.split(",").toSeq) +.getOrElse(Seq.empty[String]) + + def sparkFiles(): Seq[String] = sparkConf +.getOption("spark.files") +.map(str => str.split(",").toSeq) +.getOrElse(Seq.empty[String]) + + def driverCustomEnvs(): Seq[(String, String)] = +sparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq + + def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY) + + def nodeSelector(): Map[String, String] = +KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX) + + def get[T](config: ConfigEntry[T]): T = sparkConf.get(config) + + def get(conf: String, defaultValue: String): String = sparkConf.get(conf, defaultValue) + + def getOption(key: String): Option[String] = sparkConf.getOption(key) + --- End diff -- Sorry, do you mean we should remove this newline or that one should be added here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20900: [SPARK-23645][MINOR][DOCS][PYTHON] Add docs RE `pandas_u...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20900 The issue itself here (SPARK-23645) describes kwargs arguments support in both UDF and Pandas UDF on calling side. Seems not working but the fix looks going to be quite invasive and big. So, I suggested to fix the documentation for now. Maybe, we should revisit in the future. Let's monitor mailing list and JIRAs. https://github.com/apache/spark/pull/20900#issuecomment-376356469 with https://github.com/apache/spark/pull/20900#issuecomment-376357750 is a separate issue about partial functions and callable objects in Pandas UDF, I found during review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/20910#discussion_r177277876 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import io.fabric8.kubernetes.api.model.Pod + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, MainAppResource} +import org.apache.spark.internal.config.ConfigEntry + +private[spark] sealed trait KubernetesRoleSpecificConf + +private[spark] case class KubernetesDriverSpecificConf( + mainAppResource: Option[MainAppResource], + mainClass: String, + appName: String, + appArgs: Seq[String]) extends KubernetesRoleSpecificConf + +private[spark] case class KubernetesExecutorSpecificConf( + executorId: String, driverPod: Pod) + extends KubernetesRoleSpecificConf + +private[spark] class KubernetesConf[T <: KubernetesRoleSpecificConf]( --- End diff -- Maybe should be a `case class`? This seems like a struct-like object which inclines me to think using a `case class` seems more idiomatic here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20858: [SPARK-23736][SQL] Extending the concat function to supp...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20858 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20858: [SPARK-23736][SQL] Extending the concat function to supp...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20858 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88605/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20858: [SPARK-23736][SQL] Extending the concat function to supp...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20858 **[Test build #88605 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88605/testReport)** for PR 20858 at commit [`753499d`](https://github.com/apache/spark/commit/753499d1784bdaf7c96f67dfbc3d0ff5c1e955a9). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20910: [SPARK-22839] [K8s] Refactor to unify driver and executo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20910 **[Test build #88610 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88610/testReport)** for PR 20910 at commit [`f3540f8`](https://github.com/apache/spark/commit/f3540f8a9032ff1ffa8d98f764a2f16b5ac289f2). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20900: [SPARK-23645][MINOR][DOCS][PYTHON] Add docs RE `pandas_u...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20900 to be clear, I think both functions below ```python class F(object): def __call__(...): ... func = F() ``` ```python def naive_func(a, b): ... func = partial(naive_func, a=1) ``` should work woth Pandas UDF but seems not working given my test https://github.com/apache/spark/pull/20900#issuecomment-375949480 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20836: SPARK-23685 : Fix for the Spark Structured Stream...
Github user sirishaSindri commented on a diff in the pull request: https://github.com/apache/spark/pull/20836#discussion_r177276642 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -279,9 +279,8 @@ private[kafka010] case class InternalKafkaConsumer( if (record.offset > offset) { // This may happen when some records aged out but their offsets already got verified if (failOnDataLoss) { - reportDataLoss(true, s"Cannot fetch records in [$offset, ${record.offset})") --- End diff -- @gaborgsomogyi Thank you Gaborgsomogyi for looking at it. For the batch queries, it will always fail if it fails to read any data from the provided offsets due to lost data. https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html . With this change,it wont fail .Instead It will return all the available messages within the requested offset range. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20910: [SPARK-22839] [K8s] Refactor to unify driver and executo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20910 Kubernetes integration test status failure URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/1746/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20900: [SPARK-23645][MINOR][DOCS][PYTHON] Add docs RE `pandas_u...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20900 @icexelloss, yup ^ is correct. IIRC, we have some tests for normal udfs with callable objects and partial functions separately but seems the problem is in Pandas UDF. I think the fix itself will relativrly minimal (just from my wild guess). would you be inretested in doing this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20910: [SPARK-22839] [K8s] Refactor to unify driver and executo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20910 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88609/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20910: [SPARK-22839] [K8s] Refactor to unify driver and executo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20910 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20910: [SPARK-22839] [K8s] Refactor to unify driver and executo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20910 **[Test build #88609 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88609/testReport)** for PR 20910 at commit [`27b8634`](https://github.com/apache/spark/commit/27b86346a9b227a773f26aa1268096b2ec53f956). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20910: [SPARK-22839] [K8s] Refactor to unify driver and executo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20910 Kubernetes integration test starting URL: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-prb-spark-integration/1746/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20910: [SPARK-22839] [K8s] Refactor to unify driver and executo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20910 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20910: [SPARK-22839] [K8s] Refactor to unify driver and executo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20910 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1769/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20910: [SPARK-22839] [K8s] Refactor to unify driver and executo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20910 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88608/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20910: [SPARK-22839] [K8s] Refactor to unify driver and executo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20910 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20910: [SPARK-22839] [K8s] Refactor to unify driver and executo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20910 **[Test build #88608 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88608/testReport)** for PR 20910 at commit [`4c944c4`](https://github.com/apache/spark/commit/4c944c4863a0fe625d59354a30cf90e33d9ce7df). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/20910#discussion_r177273537 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala --- @@ -180,20 +167,17 @@ private[spark] class Client( } // Build a Config Map that will house spark conf properties in a single file for spark-submit - private def buildConfigMap(configMapName: String, conf: SparkConf): ConfigMap = { + private def buildConfigMap(configMapName: String, conf: Map[String, String]): ConfigMap = { val properties = new Properties() -conf.getAll.foreach { case (k, v) => +conf.foreach { case (k, v) => properties.setProperty(k, v) } val propertiesWriter = new StringWriter() properties.store(propertiesWriter, s"Java properties built from Kubernetes config map with name: $configMapName") - -val namespace = conf.get(KUBERNETES_NAMESPACE) new ConfigMapBuilder() .withNewMetadata() .withName(configMapName) -.withNamespace(namespace) --- End diff -- It's not necessary to set namespaces on these objects because the kubernetes client itself is namespaced. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/20910#discussion_r177273458 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPod.scala --- @@ -14,17 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.k8s.submit.steps +package org.apache.spark.deploy.k8s -import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec +import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder} -/** - * Represents a step in configuring the Spark driver pod. - */ -private[spark] trait DriverConfigurationStep { +private[spark] case class SparkPod(pod: Pod, container: Container) - /** - * Apply some transformation to the previous state of the driver to add a new feature to it. - */ - def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec +private[spark] object SparkPod { + def initialPod(): SparkPod = { +SparkPod( + new PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build(), --- End diff -- Sort of. It allows everything that consumes one of these to use `.editMetadata()` or `editOrNewMetadata` when creating features. If you don't initialize the metadata and spec and then a downstream caller tries to invoke `editMetadata` then we throw an NPE. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20910: [SPARK-22839] [K8s] Refactor to unify driver and executo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20910 **[Test build #88609 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88609/testReport)** for PR 20910 at commit [`27b8634`](https://github.com/apache/spark/commit/27b86346a9b227a773f26aa1268096b2ec53f956). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20910: [SPARK-22839] [K8s] Refactor to unify driver and executo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20910 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1768/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20910: [SPARK-22839] [K8s] Refactor to unify driver and executo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20910 **[Test build #88608 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88608/testReport)** for PR 20910 at commit [`4c944c4`](https://github.com/apache/spark/commit/4c944c4863a0fe625d59354a30cf90e33d9ce7df). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20910: [SPARK-22839] [K8s] Refactor to unify driver and executo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20910 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/20910#discussion_r177267646 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder} + +import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.internal.config._ +import org.apache.spark.launcher.SparkLauncher + +private[spark] class BasicDriverFeatureStep( + kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) --- End diff -- Should we rename this to `driverConf`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20910: [SPARK-22839] [K8s] Refactor to unify driver and ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/20910#discussion_r177266945 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import io.fabric8.kubernetes.api.model.Pod + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, MainAppResource} +import org.apache.spark.internal.config.ConfigEntry + +private[spark] sealed trait KubernetesRoleSpecificConf + +private[spark] case class KubernetesDriverSpecificConf( + mainAppResource: Option[MainAppResource], + mainClass: String, + appName: String, + appArgs: Seq[String]) extends KubernetesRoleSpecificConf + +private[spark] case class KubernetesExecutorSpecificConf( + executorId: String, driverPod: Pod) + extends KubernetesRoleSpecificConf + +private[spark] class KubernetesConf[T <: KubernetesRoleSpecificConf]( + val sparkConf: SparkConf, + val roleSpecificConf: T, + val appResourceNamePrefix: String, + val appId: String, + val roleLabels: Map[String, String], + val roleAnnotations: Map[String, String], + val roleSecretNamesToMountPaths: Map[String, String]) { + + def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE) + + def sparkJars(): Seq[String] = sparkConf +.getOption("spark.jars") +.map(str => str.split(",").toSeq) +.getOrElse(Seq.empty[String]) + + def sparkFiles(): Seq[String] = sparkConf +.getOption("spark.files") +.map(str => str.split(",").toSeq) +.getOrElse(Seq.empty[String]) + + def driverCustomEnvs(): Seq[(String, String)] = +sparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq + + def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY) + + def nodeSelector(): Map[String, String] = +KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX) + + def get[T](config: ConfigEntry[T]): T = sparkConf.get(config) + + def get(conf: String, defaultValue: String): String = sparkConf.get(conf, defaultValue) + + def getOption(key: String): Option[String] = sparkConf.getOption(key) + +} + +private[spark] object KubernetesConf { + def createDriverConf( +sparkConf: SparkConf, +appName: String, +appResourceNamePrefix: String, +appId: String, +mainAppResource: Option[MainAppResource], +mainClass: String, +appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = { +val sparkConfWithMainAppJar = sparkConf.clone() +mainAppResource.foreach { + case JavaMainAppResource(res) => +val previousJars = sparkConf + .getOption("spark.jars") + .map(_.split(",")) + .getOrElse(Array.empty) +if (!previousJars.contains(res)) { + sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) +} +} +val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( + sparkConf, + KUBERNETES_DRIVER_LABEL_PREFIX) +require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " + + s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " + + "operations.") +require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with key " + + s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark bookkeeping " + + "operations.") +val driverLabels = driverCustomLabels ++ Map( + SPARK_APP_ID_LABEL -> appId, + SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) +val driverAnnotations = +