[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/20675 cc @tdas and @jose-torres #20225 gives a quickly fix for task level retry, this is just an attempt for a maybe better implementation. Please let me know if I do something wrong or have misunderstandings of Continuous Processing. Thanks :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20675 **[Test build #87665 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87665/testReport)** for PR 20675 at commit [`21f574e`](https://github.com/apache/spark/commit/21f574e2a3ad3c8e68b92776d2a141d7fcb90502). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20675 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1053/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20675 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/20675 [SPARK-23033][SS][Follow Up] Task level retry for continuous processing ## What changes were proposed in this pull request? Here we want to reimplement the task level retry for continuous processing, changes include: 1. Add a new `EpochCoordinatorMessage` named `GetLastEpochAndOffset`, it is used for getting last epoch and offset of particular partition while task restarted. 2. Add function setOffset for `ContinuousDataReader`, it supported BaseReader can restart from given offset. ## How was this patch tested? Add new UT in `ContinuousSuite` and new `StreamAction` named `CheckAnswerRowsContainsOnlyOnce` for more accurate result checking. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-23033 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20675.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20675 commit 21f574e2a3ad3c8e68b92776d2a141d7fcb90502 Author: Yuanjian LiDate: 2018-02-26T07:27:10Z [SPARK-23033][SS][Follow Up] Task level retry for continuous processing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18581: [SPARK-21289][SQL][ML] Supports custom line separator fo...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/18581 Sure, will try to separate this. Will update my PRs soon roughly within this week. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r170510883 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +45,135 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); +this.obj = obj; +this.offset = offset; this.length = length; } + public MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException(obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Instantiate the same type of MemoryBlock with new offset and size + */ + public abstract MemoryBlock allocate(long offset, long size); + + + public abstract int getInt(long offset); + + public abstract void putInt(long offset, int value); + + public abstract boolean getBoolean(long offset); + + public abstract void putBoolean(long offset, boolean value); + + public abstract byte getByte(long offset); + + public abstract void putByte(long offset, byte value); + + public abstract short getShort(long offset); + + public abstract void putShort(long offset, short value); + + public abstract long getLong(long offset); + + public abstract void putLong(long offset, long value); + + public abstract float getFloat(long offset); + + public abstract void putFloat(long offset, float value); + + public abstract double getDouble(long offset); + + public abstract void putDouble(long offset, double value); + + public static final void copyMemory( + MemoryBlock src, long srcOffset, MemoryBlock dst, long dstOffset, long length) { +Platform.copyMemory(src.getBaseObject(), src.getBaseOffset() + srcOffset, +dst.getBaseObject(), dst.getBaseOffset() + dstOffset, length); + } + + public static final void copyMemory(MemoryBlock src, MemoryBlock dst, long length) { +Platform.copyMemory(src.getBaseObject(), src.getBaseOffset(), + dst.getBaseObject(), dst.getBaseOffset(), length); + } + + public final void copyFrom(Object src, long srcOffset, long dstOffset, long
[GitHub] spark pull request #20666: [SPARK-23448][SQL] Clarify JSON and CSV parser be...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20666#discussion_r170510616 --- Diff: python/pyspark/sql/readwriter.py --- @@ -393,13 +395,16 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. -* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ - record, and puts the malformed string into a field configured by \ - ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \ - a string type field named ``columnNameOfCorruptRecord`` in an \ - user-defined schema. If a schema does not have the field, it drops corrupt \ - records during parsing. When a length of parsed CSV tokens is shorter than \ - an expected length of a schema, it sets `null` for extra fields. +* ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \ + into a field configured by ``columnNameOfCorruptRecord``, and sets other \ + fields to ``null``. To keep corrupt records, an user can set a string type \ --- End diff -- we can't say `and sets other fields to null`, as it's not the case for CSV --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20666: [SPARK-23448][SQL] Clarify JSON and CSV parser be...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20666#discussion_r170510529 --- Diff: python/pyspark/sql/readwriter.py --- @@ -209,13 +209,15 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. -* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ - record, and puts the malformed string into a field configured by \ - ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \ - a string type field named ``columnNameOfCorruptRecord`` in an user-defined \ - schema. If a schema does not have the field, it drops corrupt records during \ - parsing. When inferring a schema, it implicitly adds a \ - ``columnNameOfCorruptRecord`` field in an output schema. +* ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \ + into a field configured by ``columnNameOfCorruptRecord``, and sets other \ + fields to ``null``. To keep corrupt records, an user can set a string type \ + field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ + schema does not have the field, it drops corrupt records during parsing. \ + When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` \ + field in an output schema. It does not support partial results. Even just one \ --- End diff -- I think we can drop the last sentence. The doc is pretty clear saying `and sets other fields to null` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r170510525 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +45,135 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); +this.obj = obj; +this.offset = offset; this.length = length; } + public MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException(obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Instantiate the same type of MemoryBlock with new offset and size + */ + public abstract MemoryBlock allocate(long offset, long size); + + + public abstract int getInt(long offset); + + public abstract void putInt(long offset, int value); + + public abstract boolean getBoolean(long offset); + + public abstract void putBoolean(long offset, boolean value); + + public abstract byte getByte(long offset); + + public abstract void putByte(long offset, byte value); + + public abstract short getShort(long offset); + + public abstract void putShort(long offset, short value); + + public abstract long getLong(long offset); + + public abstract void putLong(long offset, long value); + + public abstract float getFloat(long offset); + + public abstract void putFloat(long offset, float value); + + public abstract double getDouble(long offset); + + public abstract void putDouble(long offset, double value); + + public static final void copyMemory( + MemoryBlock src, long srcOffset, MemoryBlock dst, long dstOffset, long length) { +Platform.copyMemory(src.getBaseObject(), src.getBaseOffset() + srcOffset, +dst.getBaseObject(), dst.getBaseOffset() + dstOffset, length); + } + + public static final void copyMemory(MemoryBlock src, MemoryBlock dst, long length) { +Platform.copyMemory(src.getBaseObject(), src.getBaseOffset(), + dst.getBaseObject(), dst.getBaseOffset(), length); + } + + public final void copyFrom(Object src, long srcOffset, long dstOffset, long length)
[GitHub] spark pull request #20666: [SPARK-23448][SQL] Clarify JSON and CSV parser be...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20666#discussion_r170510027 --- Diff: python/pyspark/sql/readwriter.py --- @@ -209,13 +209,15 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. -* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ - record, and puts the malformed string into a field configured by \ - ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \ - a string type field named ``columnNameOfCorruptRecord`` in an user-defined \ - schema. If a schema does not have the field, it drops corrupt records during \ - parsing. When inferring a schema, it implicitly adds a \ - ``columnNameOfCorruptRecord`` field in an output schema. +* ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \ + into a field configured by ``columnNameOfCorruptRecord``, and sets other \ + fields to ``null``. To keep corrupt records, an user can set a string type \ + field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ + schema does not have the field, it drops corrupt records during parsing. \ + When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` \ --- End diff -- Ah I thought this: ``` When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` field in an output schema. ``` describes schema inference because it adds `columnNameOfCorruptRecord` column if malformed record was found during schema inference. I mean ..: ```scala scala> spark.read.json(Seq("""{"a": 1}""", """{"a":""").toDS).printSchema() root |-- _corrupt_record: string (nullable = true) |-- a: long (nullable = true) scala> spark.read.json(Seq("""{"a": 1}""").toDS).printSchema() root |-- a: long (nullable = true) ``` but yes I think I misread it. Here we describe things mainly about malformed records already. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20553: [SPARK-23285][K8S] Add a config property for specifying ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20553 > This is to avoid changing the semantics of spark.executor.cores and spark.task.cpus and their role in task scheduling, task parallelism, dynamic resource allocation, etc. The new configuration property only determines the physical CPU cores available to an executor. Do you mean `spark.kubernetes.executor.cores` will only be used with k8s for static allocation? It looks to me that if we wanna k8s work with Spark dynamic allocation better, we have to change the semantics of `spark.executor.cores` to support fraction. Or we introduce a new dynamic allocation module for k8s, which reads `spark.kubernetes.executor.cores`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20618: [SPARK-23329][SQL] Fix documentation of trigonometric fu...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20618 cc @felixcheung (I saw you and Felix in dev mailing list). So, https://github.com/apache/spark/tree/master/R#generating-documentation does not work? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20673 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20673 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87662/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20673 **[Test build #87662 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87662/testReport)** for PR 20673 at commit [`2df0f2b`](https://github.com/apache/spark/commit/2df0f2b6f1d8df1b4fee6534bbc649f487f6ae29). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20667: [SPARK-23508][CORE] Use timeStampedHashMap for Blockmana...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20667 Why we need this cache? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20673 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87661/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20673 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20673 **[Test build #87661 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87661/testReport)** for PR 20673 at commit [`2df0f2b`](https://github.com/apache/spark/commit/2df0f2b6f1d8df1b4fee6534bbc649f487f6ae29). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20657: [SPARK-23361][yarn] Allow AM to restart after initial to...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/20657 Will review it soon. ð --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r170505359 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -50,12 +52,11 @@ // These are only updated by readExternal() or read() @Nonnull - private Object base; - private long offset; + private MemoryBlock base; private int numBytes; --- End diff -- It's weird to see a `MemoryBlock` and `numBytes` pair, as `MemoryBlock` already has a `length` property. How about we allow projection on `MemoryBlock`? i.e. allow to increase the base offset and decrease the length. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r170504918 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java --- @@ -19,15 +19,24 @@ import org.apache.spark.unsafe.Platform; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; + +import sun.nio.ch.DirectBuffer; + /** * A simple {@link MemoryAllocator} that uses {@code Unsafe} to allocate off-heap memory. */ public class UnsafeMemoryAllocator implements MemoryAllocator { @Override - public MemoryBlock allocate(long size) throws OutOfMemoryError { + public OffHeapMemoryBlock allocate(long size) throws OutOfMemoryError { +// No usage of DirectByteBuffer.allocateDirect is current design --- End diff -- why mention this here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r170504851 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +44,149 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); +this.obj = obj; +this.offset = offset; this.length = length; } + public MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException(obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Instantiate the same type of MemoryBlock with new offset and size + */ + public abstract MemoryBlock allocate(long offset, long size); + + + public abstract int getInt(long offset); --- End diff -- hopefully the API would look like other data classes like `InternalRow` and `ColumnVector`, i.e. `getInt(int index)` and `putInt(int index, int value)`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r170504742 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +45,135 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); +this.obj = obj; +this.offset = offset; this.length = length; } + public MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException(obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Instantiate the same type of MemoryBlock with new offset and size + */ + public abstract MemoryBlock allocate(long offset, long size); + + + public abstract int getInt(long offset); + + public abstract void putInt(long offset, int value); + + public abstract boolean getBoolean(long offset); + + public abstract void putBoolean(long offset, boolean value); + + public abstract byte getByte(long offset); + + public abstract void putByte(long offset, byte value); + + public abstract short getShort(long offset); + + public abstract void putShort(long offset, short value); + + public abstract long getLong(long offset); + + public abstract void putLong(long offset, long value); + + public abstract float getFloat(long offset); + + public abstract void putFloat(long offset, float value); + + public abstract double getDouble(long offset); + + public abstract void putDouble(long offset, double value); + + public static final void copyMemory( + MemoryBlock src, long srcOffset, MemoryBlock dst, long dstOffset, long length) { +Platform.copyMemory(src.getBaseObject(), src.getBaseOffset() + srcOffset, +dst.getBaseObject(), dst.getBaseOffset() + dstOffset, length); + } + + public static final void copyMemory(MemoryBlock src, MemoryBlock dst, long length) { +Platform.copyMemory(src.getBaseObject(), src.getBaseOffset(), + dst.getBaseObject(), dst.getBaseOffset(), length); + } + + public final void copyFrom(Object src, long srcOffset, long dstOffset, long
[GitHub] spark issue #20553: [SPARK-23285][K8S] Add a config property for specifying ...
Github user liyinan926 commented on the issue: https://github.com/apache/spark/pull/20553 The value of `spark.executor.cores` will be used to set cpu request for the executor pods if `spark.kubernetes.executor.cores` is not set. `spark.driver.cores` already allows fractional values so we don't have a Kubernetes specific property for the driver pod. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20674: [SPARK-23465][SQL] Introduce new function to rename colu...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20674 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20553: [SPARK-23285][K8S] Add a config property for specifying ...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/20553 What is the default value if it is not configured, how do K8S control the CPU usage by default? Also it seems that user may configure how to differentiate between k8s executor cores and executor cores. BTW, do we also need similar k8s driver cores for cluster mode? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20618: [SPARK-23329][SQL] Fix documentation of trigonometric fu...
Github user misutoth commented on the issue: https://github.com/apache/spark/pull/20618 Sorry, I missed these comments. As I understood we fix all of them here. I am just struggling with the R documentation: it seems the generated doc is incorrect even if I just take the latest commit as it is. I thought about writing to the dev list to see if others have experienced this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20674: [SPARK-23465][SQL] Introduce new function to rename colu...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20674 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20674: [SPARK-23465][SQL] Introduce new function to rena...
GitHub user misutoth opened a pull request: https://github.com/apache/spark/pull/20674 [SPARK-23465][SQL] Introduce new function to rename columns using an algoritm ## What changes were proposed in this pull request? Add an additional convenient method to rename multiple of columns by specifying a mapping between the old and the new column name. ## How was this patch tested? Wrote additional unit test cases, ran scalastyle. You can merge this pull request into a Git repository by running: $ git pull https://github.com/misutoth/spark column-rename Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20674.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20674 commit f03884c3582cdb3bc2eaa034cbc0bcdcfd1c8250 Author: Mihaly TothDate: 2018-02-24T20:19:19Z [SPARK-23465][SQL] Introduce new function to rename columns using an algorithm --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20382 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20382 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1052/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20449#discussion_r170501977 --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala --- @@ -320,6 +319,55 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft f2.get() } + test("Interruptible iterator of shuffle reader") { --- End diff -- can we briefly explain what happened in this test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20382 **[Test build #87664 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87664/testReport)** for PR 20382 at commit [`fd890ad`](https://github.com/apache/spark/commit/fd890ad837bb7068c70a27921d67af1c3fe65350). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/20382 Jenkins, retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20449#discussion_r170501590 --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala --- @@ -320,6 +319,55 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft f2.get() } + test("Interruptible iterator of shuffle reader") { +import JobCancellationSuite._ +val numSlice = 2 +sc = new SparkContext(s"local[$numSlice]", "test") + +val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) } + .repartitionAndSortWithinPartitions(new HashPartitioner(2)) + .mapPartitions { iter => +taskStartedSemaphore.release() +iter + }.foreachAsync { x => +if ( x._1 >= 10) { // this block of code is partially executed. --- End diff -- no space after `if(` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20449#discussion_r170501479 --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala --- @@ -18,15 +18,14 @@ package org.apache.spark import java.util.concurrent.Semaphore +import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.concurrent.duration._ - import org.scalatest.BeforeAndAfter import org.scalatest.Matchers - --- End diff -- this will break the style check --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20449#discussion_r170501431 --- Diff: core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala --- @@ -104,9 +104,16 @@ private[spark] class BlockStoreShuffleReader[K, C]( context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) +// Use completion callback to stop sorter if task was completed(either finished/cancelled). --- End diff -- then we can just write `if task was finished/cancelled.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20624: [SPARK-23445] ColumnStat refactoring
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20624#discussion_r170499705 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala --- @@ -387,6 +390,143 @@ case class CatalogStatistics( } } +/** + * This class of statistics for a column is used in [[CatalogTable]] to interact with metastore. + */ +case class CatalogColumnStat( +distinctCount: Option[BigInt] = None, +min: Option[String] = None, +max: Option[String] = None, +nullCount: Option[BigInt] = None, +avgLen: Option[Long] = None, +maxLen: Option[Long] = None, +histogram: Option[Histogram] = None) { + + /** + * Returns a map from string to string that can be used to serialize the column stats. + * The key is the name of the column and name of the field (e.g. "colName.distinctCount"), + * and the value is the string representation for the value. + * min/max values are stored as Strings. They can be deserialized using + * [[CatalogColumnStat.fromExternalString]]. + * + * As part of the protocol, the returned map always contains a key called "version". + * Any of the fields that are null (None) won't appear in the map. + */ + def toMap(colName: String): Map[String, String] = { +val map = new scala.collection.mutable.HashMap[String, String] +map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", "1") +distinctCount.foreach { v => + map.put(s"${colName}.${CatalogColumnStat.KEY_DISTINCT_COUNT}", v.toString) +} +nullCount.foreach { v => + map.put(s"${colName}.${CatalogColumnStat.KEY_NULL_COUNT}", v.toString) +} +avgLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_AVG_LEN}", v.toString) } +maxLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_LEN}", v.toString) } +min.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MIN_VALUE}", v) } +max.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_VALUE}", v) } +histogram.foreach { h => + map.put(s"${colName}.${CatalogColumnStat.KEY_HISTOGRAM}", HistogramSerializer.serialize(h)) +} +map.toMap + } + + /** Convert [[CatalogColumnStat]] to [[ColumnStat]]. */ + def toPlanStat( + colName: String, + dataType: DataType): ColumnStat = +ColumnStat( + distinctCount = distinctCount, + min = min.map(CatalogColumnStat.fromExternalString(_, colName, dataType)), + max = max.map(CatalogColumnStat.fromExternalString(_, colName, dataType)), + nullCount = nullCount, + avgLen = avgLen, + maxLen = maxLen, + histogram = histogram) +} + +object CatalogColumnStat extends Logging { + + // List of string keys used to serialize CatalogColumnStat + val KEY_VERSION = "version" + private val KEY_DISTINCT_COUNT = "distinctCount" + private val KEY_MIN_VALUE = "min" + private val KEY_MAX_VALUE = "max" + private val KEY_NULL_COUNT = "nullCount" + private val KEY_AVG_LEN = "avgLen" + private val KEY_MAX_LEN = "maxLen" + private val KEY_HISTOGRAM = "histogram" + + /** + * Converts from string representation of data type to the corresponding Catalyst data type. + */ + def fromExternalString(s: String, name: String, dataType: DataType): Any = { +dataType match { + case BooleanType => s.toBoolean + case DateType => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s)) + case TimestampType => DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s)) + case ByteType => s.toByte + case ShortType => s.toShort + case IntegerType => s.toInt + case LongType => s.toLong + case FloatType => s.toFloat + case DoubleType => s.toDouble + case _: DecimalType => Decimal(s) + // This version of Spark does not use min/max for binary/string types so we ignore it. + case BinaryType | StringType => null + case _ => +throw new AnalysisException("Column statistics deserialization is not supported for " + + s"column $name of data type: $dataType.") +} + } + + /** + * Converts the given value from Catalyst data type to string representation of external + * data type. + */ + def toExternalString(v: Any, colName: String, dataType: DataType): String = { +val externalValue = dataType match { + case DateType => DateTimeUtils.toJavaDate(v.asInstanceOf[Int]) + case TimestampType => DateTimeUtils.toJavaTimestamp(v.asInstanceOf[Long]) + case BooleanType | _: IntegralType | FloatType | DoubleType => v
[GitHub] spark issue #20666: [SPARK-23448][SQL] Clarify JSON and CSV parser behavior ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20666 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1051/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20666: [SPARK-23448][SQL] Clarify JSON and CSV parser behavior ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20666 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20666: [SPARK-23448][SQL] Clarify JSON and CSV parser be...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20666#discussion_r170499119 --- Diff: python/pyspark/sql/readwriter.py --- @@ -209,13 +209,15 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. -* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ - record, and puts the malformed string into a field configured by \ - ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \ - a string type field named ``columnNameOfCorruptRecord`` in an user-defined \ - schema. If a schema does not have the field, it drops corrupt records during \ - parsing. When inferring a schema, it implicitly adds a \ - ``columnNameOfCorruptRecord`` field in an output schema. +* ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \ + into a field configured by ``columnNameOfCorruptRecord``, and sets other \ + fields to ``null``. To keep corrupt records, an user can set a string type \ + field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ + schema does not have the field, it drops corrupt records during parsing. \ + When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` \ + field in an output schema. It doesn't support partial results. Even just one \ --- End diff -- Ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20666: [SPARK-23448][SQL] Clarify JSON and CSV parser be...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20666#discussion_r170499102 --- Diff: python/pyspark/sql/readwriter.py --- @@ -393,13 +395,16 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. -* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ - record, and puts the malformed string into a field configured by \ - ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \ - a string type field named ``columnNameOfCorruptRecord`` in an \ - user-defined schema. If a schema does not have the field, it drops corrupt \ - records during parsing. When a length of parsed CSV tokens is shorter than \ - an expected length of a schema, it sets `null` for extra fields. +* ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \ + into a field configured by ``columnNameOfCorruptRecord``, and sets other \ + fields to ``null``. To keep corrupt records, an user can set a string type \ + field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ + schema does not have the field, it drops corrupt records during parsing. \ + It supports partial result for the records just with less or more tokens \ + than the schema. When it meets a malformed record whose parsed tokens is \ --- End diff -- Ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20666: [SPARK-23448][SQL] Clarify JSON and CSV parser behavior ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20666 **[Test build #87663 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87663/testReport)** for PR 20666 at commit [`1d03d3b`](https://github.com/apache/spark/commit/1d03d3b248821a05dfd2751eeb0c8b657ebc9073). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20666: [SPARK-23448][SQL] Clarify JSON and CSV parser be...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20666#discussion_r170498519 --- Diff: python/pyspark/sql/readwriter.py --- @@ -209,13 +209,15 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. -* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ - record, and puts the malformed string into a field configured by \ - ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \ - a string type field named ``columnNameOfCorruptRecord`` in an user-defined \ - schema. If a schema does not have the field, it drops corrupt records during \ - parsing. When inferring a schema, it implicitly adds a \ - ``columnNameOfCorruptRecord`` field in an output schema. +* ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \ + into a field configured by ``columnNameOfCorruptRecord``, and sets other \ + fields to ``null``. To keep corrupt records, an user can set a string type \ + field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ + schema does not have the field, it drops corrupt records during parsing. \ + When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` \ --- End diff -- When users set a string type field named `columnNameOfCorruptRecord` in an user-defined schema, even no corrupted record, I think the field is still added. Or I misread this sentence? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18581: [SPARK-21289][SQL][ML] Supports custom line separator fo...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/18581 It looks like this line separator has to be handled by each data source individually, can we start with, e.g., json, and then csv, text, etc.? Then we can have smaller PRs that would be easier to review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20382 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20382 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87660/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20382 **[Test build #87660 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87660/testReport)** for PR 20382 at commit [`fd890ad`](https://github.com/apache/spark/commit/fd890ad837bb7068c70a27921d67af1c3fe65350). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20382 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20382 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87659/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20382 **[Test build #87659 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87659/testReport)** for PR 20382 at commit [`5011372`](https://github.com/apache/spark/commit/501137269c983e4d028eba817d1c5f45a305171d). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20557: [SPARK-23364][SQL]'desc table' command in spark-sql add ...
Github user guoxiaolongzte commented on the issue: https://github.com/apache/spark/pull/20557 Well, for now, I don't have a better solution. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20543: [SPARK-23357][CORE] 'SHOW TABLE EXTENDED LIKE pattern=ST...
Github user guoxiaolongzte commented on the issue: https://github.com/apache/spark/pull/20543 Oh, I just think it adds to make it clearer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20673 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20673 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1050/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20673 **[Test build #87662 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87662/testReport)** for PR 20673 at commit [`2df0f2b`](https://github.com/apache/spark/commit/2df0f2b6f1d8df1b4fee6534bbc649f487f6ae29). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20673 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1049/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20673 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20673 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87658/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/20673 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20673 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20673 **[Test build #87658 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87658/testReport)** for PR 20673 at commit [`7741880`](https://github.com/apache/spark/commit/774188003c5b1c1a000d69f5996dce580c7a1432). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20670: [SPARK-23405] Add constranits
Github user KaiXinXiaoLei commented on the issue: https://github.com/apache/spark/pull/20670 @srowen i redescribe the problem. Now i hive a small table `ls` with one row , and a big table `catalog_sales` with One hundred billion rows. And in the big table, the non null value about `cs_order_number` field has one million. Then i join this tables with the query:`select ls.cs_order_number from ls left semi join catalog_sales cs on ls.cs_order_number = cs.cs_order_number`. My job is running, and there has been a data skew. Then i find the null value cause this phenomenon. The join condition is `ls.cs_order_number = cs.cs_order_number`. In the Optimized Logical Plan, the left table has "Filter isnotnull(cs_order_number#1)" action, so i think the right table should have âFilter isnotnullâ action. Then the right table will filter null value firstly , and join with left table secondly. So the data skew will not be caused by null value. Using this idea, my sql runs success. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20673 **[Test build #87661 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87661/testReport)** for PR 20673 at commit [`2df0f2b`](https://github.com/apache/spark/commit/2df0f2b6f1d8df1b4fee6534bbc649f487f6ae29). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20673 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20673 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1048/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferH...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/20636#discussion_r170482768 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala --- @@ -58,15 +58,20 @@ object BufferHolderSparkSubmitSuite { val holder = new BufferHolder(new UnsafeRow(1000)) holder.reset() +// execute here since reset() updates holder.cursor +val smallBuffer = new Array[Byte](holder.cursor) --- End diff -- ping @liufengdb and @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20624: [SPARK-23445] ColumnStat refactoring
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20624#discussion_r170474554 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -1059,22 +1054,22 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat None } else { - val colStats = new mutable.HashMap[String, ColumnStat] - - // For each column, recover its column stats. Note that this is currently a O(n^2) operation, - // but given the number of columns it usually not enormous, this is probably OK as a start. - // If we want to map this a linear operation, we'd need a stronger contract between the - // naming convention used for serialization. - schema.foreach { field => -if (statsProps.contains(columnStatKeyPropName(field.name, ColumnStat.KEY_VERSION))) { - // If "version" field is defined, then the column stat is defined. - val keyPrefix = columnStatKeyPropName(field.name, "") - val colStatMap = statsProps.filterKeys(_.startsWith(keyPrefix)).map { case (k, v) => -(k.drop(keyPrefix.length), v) - } - ColumnStat.fromMap(table, field, colStatMap).foreach { cs => -colStats += field.name -> cs - } + val colStats = new mutable.HashMap[String, CatalogColumnStat] + val statPropsForField = new mutable.HashMap[String, mutable.HashMap[String, String]] --- End diff -- This is useless. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20624: [SPARK-23445] ColumnStat refactoring
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20624#discussion_r170480388 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala --- @@ -154,4 +156,120 @@ case class AnalyzeColumnCommand( AttributeMap(attributePercentiles.toSeq) } + /** Returns true iff the we support gathering column statistics on column of the given type. */ + private def supportsType(dataType: DataType): Boolean = dataType match { +case _: IntegralType => true +case _: DecimalType => true +case DoubleType | FloatType => true +case BooleanType => true +case DateType => true +case TimestampType => true +case BinaryType | StringType => true +case _ => false + } + + /** Returns true iff the we support gathering histogram on column of the given type. */ + private def supportsHistogram(dataType: DataType): Boolean = dataType match { +case _: IntegralType => true +case _: DecimalType => true +case DoubleType | FloatType => true +case DateType => true +case TimestampType => true +case _ => false + } + + /** + * Constructs an expression to compute column statistics for a given column. + * + * The expression should create a single struct column with the following schema: + * distinctCount: Long, min: T, max: T, nullCount: Long, avgLen: Long, maxLen: Long, + * distinctCountsForIntervals: Array[Long] + * + * Together with [[rowToColumnStat]], this function is used to create [[ColumnStat]] and + * as a result should stay in sync with it. + */ + private def statExprs( +col: Attribute, +conf: SQLConf, +colPercentiles: AttributeMap[ArrayData]): CreateNamedStruct = { +def struct(exprs: Expression*): CreateNamedStruct = CreateStruct(exprs.map { expr => + expr.transformUp { case af: AggregateFunction => af.toAggregateExpression() } +}) +val one = Literal(1, LongType) + +// the approximate ndv (num distinct value) should never be larger than the number of rows +val numNonNulls = if (col.nullable) Count(col) else Count(one) +val ndv = Least(Seq(HyperLogLogPlusPlus(col, conf.ndvMaxError), numNonNulls)) +val numNulls = Subtract(Count(one), numNonNulls) +val defaultSize = Literal(col.dataType.defaultSize, LongType) +val nullArray = Literal(null, ArrayType(LongType)) + +def fixedLenTypeStruct: CreateNamedStruct = { + val genHistogram = +supportsHistogram(col.dataType) && colPercentiles.contains(col) + val intervalNdvsExpr = if (genHistogram) { +ApproxCountDistinctForIntervals(col, + Literal(colPercentiles(col), ArrayType(col.dataType)), conf.ndvMaxError) + } else { +nullArray + } + // For fixed width types, avg size should be the same as max size. + struct(ndv, Cast(Min(col), col.dataType), Cast(Max(col), col.dataType), numNulls, +defaultSize, defaultSize, intervalNdvsExpr) +} + +col.dataType match { + case _: IntegralType => fixedLenTypeStruct + case _: DecimalType => fixedLenTypeStruct + case DoubleType | FloatType => fixedLenTypeStruct + case BooleanType => fixedLenTypeStruct + case DateType => fixedLenTypeStruct + case TimestampType => fixedLenTypeStruct + case BinaryType | StringType => +// For string and binary type, we don't compute min, max or histogram +val nullLit = Literal(null, col.dataType) +struct( + ndv, nullLit, nullLit, numNulls, + // Set avg/max size to default size if all the values are null or there is no value. + Coalesce(Seq(Ceil(Average(Length(col))), defaultSize)), + Coalesce(Seq(Cast(Max(Length(col)), LongType), defaultSize)), + nullArray) + case _ => +throw new AnalysisException("Analyzing column statistics is not supported for column " + + s"${col.name} of data type: ${col.dataType}.") +} + } + + /** Convert a struct for column stats (defined in `statExprs`) into [[ColumnStat]]. */ + private def rowToColumnStat( +row: InternalRow, +attr: Attribute, +rowCount: Long, +percentiles: Option[ArrayData]): ColumnStat = { +// The first 6 fields are basic column stats, the 7th is ndvs for histogram bins. +val cs = ColumnStat( + distinctCount = Option(BigInt(row.getLong(0))), + // for string/binary min/max, get should return null + min = Option(row.get(1, attr.dataType)), + max = Option(row.get(2, attr.dataType)), +
[GitHub] spark pull request #20624: [SPARK-23445] ColumnStat refactoring
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20624#discussion_r170475159 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala --- @@ -187,11 +187,11 @@ object StarSchemaDetection extends PredicateHelper { stats.rowCount match { case Some(rowCount) if rowCount >= 0 => if (stats.attributeStats.nonEmpty && stats.attributeStats.contains(col)) { -val colStats = stats.attributeStats.get(col) -if (colStats.get.nullCount > 0) { +val colStats = stats.attributeStats.get(col).get +if (!colStats.hasCountStats || colStats.nullCount.get > 0) { --- End diff -- Do we need to check whether it is defined before calling `.get`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20624: [SPARK-23445] ColumnStat refactoring
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20624#discussion_r170474720 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -1059,22 +1054,22 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat None } else { - val colStats = new mutable.HashMap[String, ColumnStat] - - // For each column, recover its column stats. Note that this is currently a O(n^2) operation, - // but given the number of columns it usually not enormous, this is probably OK as a start. - // If we want to map this a linear operation, we'd need a stronger contract between the - // naming convention used for serialization. - schema.foreach { field => -if (statsProps.contains(columnStatKeyPropName(field.name, ColumnStat.KEY_VERSION))) { - // If "version" field is defined, then the column stat is defined. - val keyPrefix = columnStatKeyPropName(field.name, "") - val colStatMap = statsProps.filterKeys(_.startsWith(keyPrefix)).map { case (k, v) => -(k.drop(keyPrefix.length), v) - } - ColumnStat.fromMap(table, field, colStatMap).foreach { cs => -colStats += field.name -> cs - } + val colStats = new mutable.HashMap[String, CatalogColumnStat] + val statPropsForField = new mutable.HashMap[String, mutable.HashMap[String, String]] + + val colStatsProps = properties.filterKeys(_.startsWith(STATISTICS_COL_STATS_PREFIX)).map { +case (k, v) => k.drop(STATISTICS_COL_STATS_PREFIX.length) -> v + } + + // Find all the column names by matching the KEY_VERSION properties for them. + val fieldNames = colStatsProps.keys.filter { --- End diff -- `fieldNames` is not being used. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20382 **[Test build #87660 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87660/testReport)** for PR 20382 at commit [`fd890ad`](https://github.com/apache/spark/commit/fd890ad837bb7068c70a27921d67af1c3fe65350). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20382 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20382 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1047/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20382 **[Test build #87659 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87659/testReport)** for PR 20382 at commit [`5011372`](https://github.com/apache/spark/commit/501137269c983e4d028eba817d1c5f45a305171d). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20382 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1046/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20382: [SPARK-23097][SQL][SS] Migrate text socket source to V2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20382 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20672: [SPARK-23509][Build] Upgrade commons-net from 2.2 to 3.1
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20672 cc @srowen --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20673#discussion_r170477999 --- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala --- @@ -100,7 +102,16 @@ private[spark] object JsonProtocol { executorMetricsUpdateToJson(metricsUpdate) case blockUpdate: SparkListenerBlockUpdated => blockUpdateToJson(blockUpdate) - case _ => parse(mapper.writeValueAsString(event)) + case _ => +val outputStream = new PipedOutputStream() +val inputStream = new PipedInputStream(outputStream) +try { + mapper.writeValue(outputStream, event) + parse(inputStream) +} finally { + IOUtils.closeQuietly(outputStream) --- End diff -- and .. another note for `IOUtils.closeQuietly` saying that it's intentionally used in case that the close might be already attempted by Jackson's library if I understood correctly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20673#discussion_r170478050 --- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala --- @@ -17,13 +17,15 @@ package org.apache.spark.util +import java.io.{ByteArrayOutputStream, PipedInputStream, PipedOutputStream} --- End diff -- `ByteArrayOutputStream` seems not used here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20673#discussion_r170477844 --- Diff: core/src/main/scala/org/apache/spark/util/JsonProtocol.scala --- @@ -100,7 +102,16 @@ private[spark] object JsonProtocol { executorMetricsUpdateToJson(metricsUpdate) case blockUpdate: SparkListenerBlockUpdated => blockUpdateToJson(blockUpdate) - case _ => parse(mapper.writeValueAsString(event)) + case _ => +val outputStream = new PipedOutputStream() --- End diff -- Hi @brkyvz, how about adding a note that `PipedOutputStream` is intentionally used to get rid of additional consumption of memory if I get this correctly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20673 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1045/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20673 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20673: [SPARK-23515] Use input/output streams for large events ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20673 **[Test build #87658 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87658/testReport)** for PR 20673 at commit [`7741880`](https://github.com/apache/spark/commit/774188003c5b1c1a000d69f5996dce580c7a1432). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20673: [SPARK-23515] Use input/output streams for large ...
GitHub user brkyvz opened a pull request: https://github.com/apache/spark/pull/20673 [SPARK-23515] Use input/output streams for large events in JsonProtocol.sparkEventToJson ## What changes were proposed in this pull request? `def sparkEventToJson(event: SparkListenerEvent)` has a fallback method which creates a JSON object by turning an unrecognized event to Json and then parsing it again. This method materializes the whole string to parse the json record, which is unnecessary, and can cause OOMs as seen in the stack trace below: ``` java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOfRange(Arrays.java:3664) at java.lang.String.(String.java:207) at java.lang.StringBuilder.toString(StringBuilder.java:407) at com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:356) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.getText(ReaderBasedJsonParser.java:235) at org.json4s.jackson.JValueDeserializer.deserialize(JValueDeserializer.scala:20) at org.json4s.jackson.JValueDeserializer.deserialize(JValueDeserializer.scala:42) at org.json4s.jackson.JValueDeserializer.deserialize(JValueDeserializer.scala:35) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3736) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726) at org.json4s.jackson.JsonMethods$class.parse(JsonMethods.scala:20) at org.json4s.jackson.JsonMethods$.parse(JsonMethods.scala:50) at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:103) ``` We should just use the stream parsing to avoid such OOMs. ## How was this patch tested? You can merge this pull request into a Git repository by running: $ git pull https://github.com/brkyvz/spark eventLoggingJson Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20673.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20673 commit 774188003c5b1c1a000d69f5996dce580c7a1432 Author: Burak YavuzDate: 2018-02-25T20:07:22Z use streams for large events --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20624: [SPARK-23445] ColumnStat refactoring
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20624 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87657/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20624: [SPARK-23445] ColumnStat refactoring
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20624 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20624: [SPARK-23445] ColumnStat refactoring
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20624 **[Test build #87657 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87657/testReport)** for PR 20624 at commit [`0406f52`](https://github.com/apache/spark/commit/0406f52ff9bae43890ab9f36616a1474f0fabd58). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20672: [SPARK-23509][Build] Upgrade commons-net from 2.2 to 3.1
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20672 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87656/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20672: [SPARK-23509][Build] Upgrade commons-net from 2.2 to 3.1
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20672 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20672: [SPARK-23509][Build] Upgrade commons-net from 2.2 to 3.1
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20672 **[Test build #87656 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87656/testReport)** for PR 20672 at commit [`745f9f9`](https://github.com/apache/spark/commit/745f9f9c6fc7dbc2f5b69cd87ab41392d16cfca2). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20671: [SPARK-23510][SQL] Support Hive 2.2 and Hive 2.3 metasto...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20671 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20671: [SPARK-23510][SQL] Support Hive 2.2 and Hive 2.3 metasto...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20671 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87654/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20671: [SPARK-23510][SQL] Support Hive 2.2 and Hive 2.3 metasto...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20671 **[Test build #87654 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87654/testReport)** for PR 20671 at commit [`2e12c10`](https://github.com/apache/spark/commit/2e12c10cb5b0f6f901c89e61db2c51b18b6cf92c). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20671: [SPARK-23510][SQL] Support Hive 2.2 and Hive 2.3 metasto...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20671 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20671: [SPARK-23510][SQL] Support Hive 2.2 and Hive 2.3 metasto...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20671 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87653/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20671: [SPARK-23510][SQL] Support Hive 2.2 and Hive 2.3 metasto...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20671 **[Test build #87653 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87653/testReport)** for PR 20671 at commit [`0fdf29b`](https://github.com/apache/spark/commit/0fdf29b78d8b8588a4af10238477a27303e1f587). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20624: [SPARK-23445] ColumnStat refactoring
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20624 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1044/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org