[GitHub] spark pull request #20407: [SPARK-23124][SQL] Allow to disable BroadcastNest...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20407#discussion_r167392027 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -156,6 +156,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val ALLOW_NESTEDJOIN_FALLBACK = buildConf("spark.sql.join.broadcastJoinFallback.enabled") +.internal() +.doc("When true (default), if the other options are not available, fallback to try and use " + + "BroadcastNestedLoopJoin as join strategy. This can cause OOM which can be a problem " + + "in some scenarios, eg. when running the thriftserver. Turn to false to disable it: an " + + "AnalysisException will be thrown.") --- End diff -- why? May you please explain which would be the right direction then? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20519: [Spark-23240][python] Don't let python site customizatio...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20519 I am not yet fully sure about how sitecustomize fills some bytes to stdout but I believe it doesn't always happen because at least we have it for Python coverage - https://github.com/apache/spark/blob/master/python/test_coverage/sitecustomize.py. The only way I used to verify this PR and https://github.com/apache/spark/pull/20424 was to manually add some bytes to stdout. So, I am still thinking it's a corner case .. Can we otherwise simply flush and remove the stdout right before starting the daemon? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20531: [SPARK-23352][PYTHON] Explicitly specify supported types...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20531 **[Test build #87280 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87280/testReport)** for PR 20531 at commit [`07f2d78`](https://github.com/apache/spark/commit/07f2d781022a44186fe5ab4c5621d80acce0711f). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20531: [SPARK-23352][PYTHON] Explicitly specify supported types...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20531 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20531: [SPARK-23352][PYTHON] Explicitly specify supported types...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20531 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/766/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20531: [SPARK-23352][PYTHON] Explicitly specify supporte...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20531#discussion_r167391819 --- Diff: python/pyspark/sql/tests.py --- @@ -4509,23 +4523,32 @@ def weighted_mean(v, w): return weighted_mean def test_manual(self): +from pyspark.sql.functions import pandas_udf, array + df = self.data sum_udf = self.pandas_agg_sum_udf mean_udf = self.pandas_agg_mean_udf - -result1 = df.groupby('id').agg(sum_udf(df.v), mean_udf(df.v)).sort('id') +mean_arr_udf = pandas_udf( +self.pandas_agg_mean_udf.func, --- End diff -- Hm .. let's do it separately for type coercion stuff in another issue. I think we need another iteration for it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorter with ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20561 **[Test build #87279 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87279/testReport)** for PR 20561 at commit [`8dab79a`](https://github.com/apache/spark/commit/8dab79a46d9201ec0e43e60d6ef841bf91f4c616). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorter with ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20561 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/765/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorter with ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20561 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorter with ...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20561 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/20555#discussion_r167391619 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -258,54 +262,43 @@ public int read(byte[] b, int offset, int len) throws IOException { if (len == 0) { return 0; } -stateChangeLock.lock(); -try { - return readInternal(b, offset, len); -} finally { - stateChangeLock.unlock(); -} - } - - /** - * flip the active and read ahead buffer - */ - private void swapBuffers() { -ByteBuffer temp = activeBuffer; -activeBuffer = readAheadBuffer; -readAheadBuffer = temp; - } - /** - * Internal read function which should be called only from read() api. The assumption is that - * the stateChangeLock is already acquired in the caller before calling this function. - */ - private int readInternal(byte[] b, int offset, int len) throws IOException { -assert (stateChangeLock.isLocked()); if (!activeBuffer.hasRemaining()) { - waitForAsyncReadComplete(); - if (readAheadBuffer.hasRemaining()) { -swapBuffers(); - } else { -// The first read or activeBuffer is skipped. -readAsync(); + // No remaining in active buffer - lock and switch to write ahead buffer. + stateChangeLock.lock(); + try { waitForAsyncReadComplete(); -if (isEndOfStream()) { - return -1; +if (!readAheadBuffer.hasRemaining()) { + // The first read or activeBuffer is skipped. + readAsync(); + waitForAsyncReadComplete(); + if (isEndOfStream()) { +return -1; + } } +// Swap the newly read read ahead buffer in place of empty active buffer. --- End diff -- Is it good to use `read-ahead` in comments for ease of reading? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20559: [SPARK-23360][SQL][PYTHON] Get local timezone fro...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20559#discussion_r167391496 --- Diff: python/pyspark/sql/tests.py --- @@ -2867,6 +2867,35 @@ def test_create_dataframe_required_pandas_not_found(self): "d": [pd.Timestamp.now().date()]}) self.spark.createDataFrame(pdf) +# Regression test for SPARK-23360 +@unittest.skipIf(not _have_pandas, _pandas_requirement_message) +def test_create_dateframe_from_pandas_with_dst(self): +import pandas as pd +from datetime import datetime + +pdf = pd.DataFrame({'time': [datetime(2015, 10, 31, 22, 30)]}) + +df = self.spark.createDataFrame(pdf) +self.assertPandasEqual(pdf, df.toPandas()) + +orig_env_tz = os.environ.get('TZ', None) +orig_session_tz = self.spark.conf.get('spark.sql.session.timeZone') +try: +tz = 'America/Los_Angeles' +os.environ['TZ'] = tz +time.tzset() +self.spark.conf.set('spark.sql.session.timeZone', tz) + +df = self.spark.createDataFrame(pdf) +df.show() --- End diff -- gentle reminder for it. Seems it's now there for debugging purpose I guess? :). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20559: [SPARK-23360][SQL][PYTHON] Get local timezone fro...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20559#discussion_r167391409 --- Diff: python/pyspark/sql/types.py --- @@ -1766,15 +1781,13 @@ def _check_series_convert_timestamps_localize(s, from_timezone, to_timezone): import pandas as pd from pandas.api.types import is_datetime64tz_dtype, is_datetime64_dtype -from_tz = from_timezone or 'tzlocal()' -to_tz = to_timezone or 'tzlocal()' +from_tz = from_timezone or _get_local_timezone() +to_tz = to_timezone or _get_local_timezone() # TODO: handle nested timestamps, such as ArrayType(TimestampType())? if is_datetime64tz_dtype(s.dtype): return s.dt.tz_convert(to_tz).dt.tz_localize(None) elif is_datetime64_dtype(s.dtype) and from_tz != to_tz: -# `s.dt.tz_localize('tzlocal()')` doesn't work properly when including NaT. -return s.apply(lambda ts: ts.tz_localize(from_tz).tz_convert(to_tz).tz_localize(None) - if ts is not pd.NaT else pd.NaT) +return s.dt.tz_localize(from_tz).dt.tz_convert(to_tz).dt.tz_localize(None) --- End diff -- @ueshin, is it safe to remove `if ts is not pd.NaT else pd.NaT`? Seems there is a small possibility for `tzlocal()`: https://github.com/pandas-dev/pandas/blob/0.19.x/pandas/tslib.pyx#L1760 https://github.com/pandas-dev/pandas/blob/0.19.x/pandas/tslib.pyx#L54 https://github.com/dateutil/dateutil/blob/2.6.1/dateutil/tz/tz.py#L1362 https://github.com/dateutil/dateutil/blob/2.6.1/dateutil/tz/tz.py#L1408 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/20555#discussion_r167391309 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -232,7 +229,9 @@ private void waitForAsyncReadComplete() throws IOException { stateChangeLock.lock(); try { while (readInProgress) { +isWaiting.set(true); asyncReadComplete.await(); +isWaiting.set(false); --- End diff -- What happens if `await()` throws an exception? Is it ok not to update `isWaiting`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20559: [WIP][SPARK-23360][SQL][PYTHON] Get local timezone from ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20559 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87278/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20559: [WIP][SPARK-23360][SQL][PYTHON] Get local timezone from ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20559 **[Test build #87278 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87278/testReport)** for PR 20559 at commit [`e20e9fd`](https://github.com/apache/spark/commit/e20e9fdba7fd2a4db059fe8016f0a0a60f3dd71d). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20559: [WIP][SPARK-23360][SQL][PYTHON] Get local timezone from ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20559 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20559: [WIP][SPARK-23360][SQL][PYTHON] Get local timezone from ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20559 **[Test build #87278 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87278/testReport)** for PR 20559 at commit [`e20e9fd`](https://github.com/apache/spark/commit/e20e9fdba7fd2a4db059fe8016f0a0a60f3dd71d). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20559: [WIP][SPARK-23360][SQL][PYTHON] Get local timezone from ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20559 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/764/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20559: [WIP][SPARK-23360][SQL][PYTHON] Get local timezone from ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20559 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20559: [WIP][SPARK-23360][SQL][PYTHON] Get local timezone from ...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/20559 @BryanCutler Seems like pandas handles `tzlocal()` differently than other timezone, and it might handle DST incorrectly, I guess. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20559: [WIP][SPARK-23360][SQL][PYTHON] Get local timezon...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/20559#discussion_r167390815 --- Diff: python/pyspark/sql/types.py --- @@ -1709,6 +1709,15 @@ def _check_dataframe_convert_date(pdf, schema): return pdf +def _get_local_timezone(): +""" Get local timezone from environment vi pytz, or dateutil. """ --- End diff -- I modified and added the comment. Can you understand by the comment? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorter with ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20561 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorter with ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20561 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87277/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorter with ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20561 **[Test build #87277 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87277/testReport)** for PR 20561 at commit [`8dab79a`](https://github.com/apache/spark/commit/8dab79a46d9201ec0e43e60d6ef841bf91f4c616). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20559: [WIP][SPARK-23360][SQL][PYTHON] Get local timezon...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/20559#discussion_r167389708 --- Diff: python/pyspark/sql/types.py --- @@ -1709,6 +1709,15 @@ def _check_dataframe_convert_date(pdf, schema): return pdf +def _get_local_timezone(): +""" Get local timezone from environment vi pytz, or dateutil. """ +from pyspark.sql.utils import require_minimum_pandas_version +require_minimum_pandas_version() --- End diff -- Actually, it isn't needed here. I'll remove it. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20559: [WIP][SPARK-23360][SQL][PYTHON] Get local timezon...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/20559#discussion_r167389710 --- Diff: python/pyspark/sql/tests.py --- @@ -4124,7 +4126,7 @@ def test_vectorized_udf_timestamps(self): data = [(0, datetime(1969, 1, 1, 1, 1, 1)), (1, datetime(2012, 2, 2, 2, 2, 2)), (2, None), -(3, datetime(2100, 3, 3, 3, 3, 3))] +(3, datetime(2100, 4, 4, 4, 4, 4))] --- End diff -- I'll revert it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20559: [WIP][SPARK-23360][SQL][PYTHON] Get local timezon...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/20559#discussion_r167389709 --- Diff: python/pyspark/sql/types.py --- @@ -1709,6 +1709,15 @@ def _check_dataframe_convert_date(pdf, schema): return pdf +def _get_local_timezone(): +""" Get local timezone from environment vi pytz, or dateutil. """ +from pyspark.sql.utils import require_minimum_pandas_version +require_minimum_pandas_version() + +import os +return os.environ.get('TZ', 'dateutil/:') --- End diff -- Sure, I'll add some comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19788 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20563: [SPARK-23186][SQL][BRANCH-2.2] Initialize DriverM...
Github user dongjoon-hyun closed the pull request at: https://github.com/apache/spark/pull/20563 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20563: [SPARK-23186][SQL][BRANCH-2.2] Initialize DriverManager ...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/20563 Thank you so much! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20565: SPAR[SPARK-23379][SQL] remove redundant metastore access
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20565 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20565: SPAR[SPARK-23379][SQL] remove redundant metastore access
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20565 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87276/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20565: SPAR[SPARK-23379][SQL] remove redundant metastore access
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20565 **[Test build #87276 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87276/testReport)** for PR 20565 at commit [`71df831`](https://github.com/apache/spark/commit/71df831d9da133bfc3881c278632ec932e762924). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorter with ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20561 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/763/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorter with ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20561 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorter with ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20561 **[Test build #87277 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87277/testReport)** for PR 20561 at commit [`8dab79a`](https://github.com/apache/spark/commit/8dab79a46d9201ec0e43e60d6ef841bf91f4c616). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorte...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20561#discussion_r167387192 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java --- @@ -98,10 +99,20 @@ public UnsafeKVExternalSorter( numElementsForSpillThreshold, canUseRadixSort); } else { - // The array will be used to do in-place sort, which require half of the space to be empty. - // Note: each record in the map takes two entries in the array, one is record pointer, - // another is the key prefix. - assert(map.numKeys() * 2 <= map.getArray().size() / 2); + // `BytesToBytesMap`'s point array is only guaranteed to hold all the distinct keys, but + // `UnsafeInMemorySorter`'s point array need to hold all the entries. Since `BytesToBytesMap` + // can have duplicated keys, here we need a check to make sure the point array can hold + // all the entries in `BytesToBytesMap`. + final LongArray pointArray; + // The point array will be used to do in-place sort, which require half of the space to be + // empty. Note: each record in the map takes two entries in the point array, one is record + // pointer, another is the key prefix. + if (map.numValues() > map.getArray().size() / 4) { +pointArray = map.allocateArray(map.numValues() * 4); --- End diff -- `map.allocateArray` will trigger other consumers to spill is memory is not enough. If the allocation still fails, there is nothing we can do, just let the execution fail. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorte...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20561#discussion_r167387138 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala --- @@ -205,4 +206,42 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext { spill = true ) } + + test("SPARK-23376: Create UnsafeKVExternalSorter with BytesToByteMap having duplicated keys") { +val memoryManager = new TestMemoryManager(new SparkConf()) +val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) +val map = new BytesToBytesMap(taskMemoryManager, 64, taskMemoryManager.pageSizeBytes()) + +// Key/value are a unsafe rows with a single int column +val schema = new StructType().add("i", IntegerType) +val key = new UnsafeRow(1) +key.pointTo(new Array[Byte](32), 32) +key.setInt(0, 1) +val value = new UnsafeRow(1) +value.pointTo(new Array[Byte](32), 32) +value.setInt(0, 2) + +for (_ <- 1 to 65) { + val loc = map.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes) + loc.append( +key.getBaseObject, key.getBaseOffset, key.getSizeInBytes, +value.getBaseObject, value.getBaseOffset, value.getSizeInBytes) +} + +// Make sure we can successfully create a UnsafeKVExternalSorter with a `BytesToBytesMap` +// which has duplicated keys and the number of entries exceeds its capacity. --- End diff -- yes, we use `BytesToBytesMap` to build the broadcast join hash relation, which may have duplicated keys. I only create a new pointer array if the existing one is not big enough, so we won't have performance regression for aggregate. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorte...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20561#discussion_r167387073 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java --- @@ -98,10 +99,20 @@ public UnsafeKVExternalSorter( numElementsForSpillThreshold, canUseRadixSort); } else { - // The array will be used to do in-place sort, which require half of the space to be empty. - // Note: each record in the map takes two entries in the array, one is record pointer, - // another is the key prefix. - assert(map.numKeys() * 2 <= map.getArray().size() / 2); + // `BytesToBytesMap`'s point array is only guaranteed to hold all the distinct keys, but + // `UnsafeInMemorySorter`'s point array need to hold all the entries. Since `BytesToBytesMap` --- End diff -- yea, but it's not trivial, I'd like to do it later. The required change I can think of: `BytesToBytesMap` is actually a key -> list, and we need to provide a way to iterate key -> list instead of key -> value. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20490 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r167386197 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -116,21 +118,45 @@ object DataWritingSparkTask extends Logging { def run( writeTask: DataWriterFactory[InternalRow], context: TaskContext, - iter: Iterator[InternalRow]): WriterCommitMessage = { -val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) + iter: Iterator[InternalRow], + useCommitCoordinator: Boolean): WriterCommitMessage = { +val stageId = context.stageId() +val partId = context.partitionId() +val attemptId = context.attemptNumber() +val dataWriter = writeTask.createDataWriter(partId, attemptId) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { iter.foreach(dataWriter.write) - logInfo(s"Writer for partition ${context.partitionId()} is committing.") - val msg = dataWriter.commit() - logInfo(s"Writer for partition ${context.partitionId()} committed.") + + val msg = if (useCommitCoordinator) { +val coordinator = SparkEnv.get.outputCommitCoordinator +val commitAuthorized = coordinator.canCommit(context.stageId(), partId, attemptId) +if (commitAuthorized) { + logInfo(s"Writer for stage $stageId, task $partId.$attemptId is authorized to commit.") + dataWriter.commit() + +} else { + val message = s"Stage $stageId, task $partId.$attemptId: driver did not authorize commit" --- End diff -- `authorize to commit` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r167386169 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -116,21 +118,45 @@ object DataWritingSparkTask extends Logging { def run( writeTask: DataWriterFactory[InternalRow], context: TaskContext, - iter: Iterator[InternalRow]): WriterCommitMessage = { -val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) + iter: Iterator[InternalRow], + useCommitCoordinator: Boolean): WriterCommitMessage = { +val stageId = context.stageId() +val partId = context.partitionId() +val attemptId = context.attemptNumber() +val dataWriter = writeTask.createDataWriter(partId, attemptId) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { iter.foreach(dataWriter.write) - logInfo(s"Writer for partition ${context.partitionId()} is committing.") - val msg = dataWriter.commit() - logInfo(s"Writer for partition ${context.partitionId()} committed.") + + val msg = if (useCommitCoordinator) { +val coordinator = SparkEnv.get.outputCommitCoordinator +val commitAuthorized = coordinator.canCommit(context.stageId(), partId, attemptId) +if (commitAuthorized) { + logInfo(s"Writer for stage $stageId, task $partId.$attemptId is authorized to commit.") + dataWriter.commit() + --- End diff -- nit: remove unnecessary blank line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r167386125 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java --- @@ -78,10 +88,11 @@ default void onDataWriterCommit(WriterCommitMessage message) {} * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it. * - * Note that, one partition may have multiple committed data writers because of speculative tasks. - * Spark will pick the first successful one and get its commit message. Implementations should be - * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data - * writer can commit, or have a way to clean up the data of already-committed writers. + * Note that speculative execution may cause multiple tasks to run for a partition. By default, + * Spark uses the commit coordinator to allow only one attempt to commit. Implementations can + * disable this behavior by overriding {@link #useCommitCoordinator()}. If disabled, multiple + * attempts may have committed successfully and all successful commit messages are passed to this --- End diff -- `... committed successfully, and Spark will pick the commit message that arrives at driver side first.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r167386061 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java --- @@ -78,10 +88,11 @@ default void onDataWriterCommit(WriterCommitMessage message) {} * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it. * - * Note that, one partition may have multiple committed data writers because of speculative tasks. - * Spark will pick the first successful one and get its commit message. Implementations should be - * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data - * writer can commit, or have a way to clean up the data of already-committed writers. + * Note that speculative execution may cause multiple tasks to run for a partition. By default, + * Spark uses the commit coordinator to allow only one attempt to commit. Implementations can --- End diff -- nit: `only one` -> `at most one` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20563: [SPARK-23186][SQL][BRANCH-2.2] Initialize DriverManager ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20563 thanks, merging to 2.2! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167385858 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -37,22 +100,129 @@ case class DataSourceV2Relation( } override def newInstance(): DataSourceV2Relation = { -copy(output = output.map(_.newInstance())) +// projection is used to maintain id assignment. +// if projection is not set, use output so the copy is not equal to the original +copy(projection = projection.map(_.newInstance())) } } /** * A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwise identical * to the non-streaming relation. */ -class StreamingDataSourceV2Relation( +case class StreamingDataSourceV2Relation( output: Seq[AttributeReference], -reader: DataSourceReader) extends DataSourceV2Relation(output, reader) { +reader: DataSourceReader) +extends LeafNode with DataSourceReaderHolder with MultiInstanceRelation { override def isStreaming: Boolean = true + + override def canEqual(other: Any): Boolean = other.isInstanceOf[StreamingDataSourceV2Relation] + + override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance())) } object DataSourceV2Relation { - def apply(reader: DataSourceReader): DataSourceV2Relation = { -new DataSourceV2Relation(reader.readSchema().toAttributes, reader) + private implicit class SourceHelpers(source: DataSourceV2) { +def asReadSupport: ReadSupport = { + source match { +case support: ReadSupport => + support +case _: ReadSupportWithSchema => + // this method is only called if there is no user-supplied schema. if there is no + // user-supplied schema and ReadSupport was not implemented, throw a helpful exception. + throw new AnalysisException(s"Data source requires a user-supplied schema: $name") +case _ => + throw new AnalysisException(s"Data source is not readable: $name") + } +} + +def asReadSupportWithSchema: ReadSupportWithSchema = { + source match { +case support: ReadSupportWithSchema => + support +case _: ReadSupport => --- End diff -- another concern is: this check should be done ASAP so that we can fail earlier. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20564: [SPARK-23378][SQL] move setCurrentDatabase from HiveExte...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20564 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87275/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20564: [SPARK-23378][SQL] move setCurrentDatabase from HiveExte...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20564 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167385829 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -37,22 +100,129 @@ case class DataSourceV2Relation( } override def newInstance(): DataSourceV2Relation = { -copy(output = output.map(_.newInstance())) +// projection is used to maintain id assignment. +// if projection is not set, use output so the copy is not equal to the original +copy(projection = projection.map(_.newInstance())) } } /** * A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwise identical * to the non-streaming relation. */ -class StreamingDataSourceV2Relation( +case class StreamingDataSourceV2Relation( output: Seq[AttributeReference], -reader: DataSourceReader) extends DataSourceV2Relation(output, reader) { +reader: DataSourceReader) +extends LeafNode with DataSourceReaderHolder with MultiInstanceRelation { override def isStreaming: Boolean = true + + override def canEqual(other: Any): Boolean = other.isInstanceOf[StreamingDataSourceV2Relation] + + override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance())) } object DataSourceV2Relation { - def apply(reader: DataSourceReader): DataSourceV2Relation = { -new DataSourceV2Relation(reader.readSchema().toAttributes, reader) + private implicit class SourceHelpers(source: DataSourceV2) { +def asReadSupport: ReadSupport = { + source match { +case support: ReadSupport => + support +case _: ReadSupportWithSchema => + // this method is only called if there is no user-supplied schema. if there is no + // user-supplied schema and ReadSupport was not implemented, throw a helpful exception. + throw new AnalysisException(s"Data source requires a user-supplied schema: $name") +case _ => + throw new AnalysisException(s"Data source is not readable: $name") + } +} + +def asReadSupportWithSchema: ReadSupportWithSchema = { + source match { +case support: ReadSupportWithSchema => + support +case _: ReadSupport => --- End diff -- This is different from before: see https://github.com/apache/spark/pull/20387/files#diff-f70bda59304588cc3abfa3a9840653f4L214 Even if `userSpecifiedSchema` is not none, it's still allowed to have `ReadSupport`, as long as its reader's schema is same as the user specified schema. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20564: [SPARK-23378][SQL] move setCurrentDatabase from HiveExte...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20564 **[Test build #87275 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87275/testReport)** for PR 20564 at commit [`f1acb5a`](https://github.com/apache/spark/commit/f1acb5ad9a12df11faf082144c676082df647ff9). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167385722 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,80 @@ package org.apache.spark.sql.execution.datasources.v2 +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Seq[AttributeReference], +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { --- End diff -- nit: may be more clear to call it `userSpecifiedSchema` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable logical ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20387 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87274/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable logical ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20387 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable logical ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20387 **[Test build #87274 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87274/testReport)** for PR 20387 at commit [`ce5f40d`](https://github.com/apache/spark/commit/ce5f40d6a512874e2dd45bab9256f77ff74e628b). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20565: SPAR[SPARK-23379][SQL] remove redundant metastore access
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20565 **[Test build #87276 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87276/testReport)** for PR 20565 at commit [`71df831`](https://github.com/apache/spark/commit/71df831d9da133bfc3881c278632ec932e762924). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20565: SPAR[SPARK-23379][SQL] remove redundant metastore access
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20565 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/762/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20565: SPAR[SPARK-23379][SQL] remove redundant metastore access
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20565 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20565: SPAR[SPARK-23379][SQL] remove redundant metastore...
GitHub user liufengdb opened a pull request: https://github.com/apache/spark/pull/20565 SPAR[SPARK-23379][SQL] remove redundant metastore access ## What changes were proposed in this pull request? If the target database name is as same as the current database, we should be able to skip one metastore access. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/liufengdb/spark remove-redundant Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20565.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20565 commit f1acb5ad9a12df11faf082144c676082df647ff9 Author: Feng LiuDate: 2018-02-10T00:18:44Z some commit c29aa3ecb78dc345bb2ad1e5cbaf29d3fdb3a803 Author: Feng Liu Date: 2018-02-10T01:10:53Z init --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20564: [SPARK-23378][SQL] move setCurrentDatabase from H...
Github user liufengdb commented on a diff in the pull request: https://github.com/apache/spark/pull/20564#discussion_r167381054 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -1107,11 +1107,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } -// Note: Before altering table partitions in Hive, you *must* set the current database -// to the one that contains the table of interest. Otherwise you will end up with the -// most helpful error message ever: "Unable to alter partition. alter is not possible." -// See HIVE-2742 for more detail. -client.setCurrentDatabase(db) --- End diff -- Sorry, I meant `a special case`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20564: [SPARK-23378][SQL] move setCurrentDatabase from H...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20564#discussion_r167380287 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala --- @@ -1107,11 +1107,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } -// Note: Before altering table partitions in Hive, you *must* set the current database -// to the one that contains the table of interest. Otherwise you will end up with the -// most helpful error message ever: "Unable to alter partition. alter is not possible." -// See HIVE-2742 for more detail. -client.setCurrentDatabase(db) --- End diff -- Can we have a test case for the exception? > This removes the exception that some calls from HiveExternalCatalog can reset the current database in the hive client as a side effect. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20407: [SPARK-23124][SQL] Allow to disable BroadcastNest...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20407#discussion_r167378911 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -156,6 +156,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val ALLOW_NESTEDJOIN_FALLBACK = buildConf("spark.sql.join.broadcastJoinFallback.enabled") +.internal() +.doc("When true (default), if the other options are not available, fallback to try and use " + + "BroadcastNestedLoopJoin as join strategy. This can cause OOM which can be a problem " + + "in some scenarios, eg. when running the thriftserver. Turn to false to disable it: an " + + "AnalysisException will be thrown.") --- End diff -- I do not think this can resolve the issue. The fix is in a wrong direction. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20564: [SPARK-23378][SQL] move setCurrentDatabase from HiveExte...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20564 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/761/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20564: [SPARK-23378][SQL] move setCurrentDatabase from HiveExte...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20564 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20562: [SPARK-23275][SQL] fix the thread leaking in hive...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20562 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20564: [SPARK-23378][SQL] move setCurrentDatabase from HiveExte...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20564 **[Test build #87275 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87275/testReport)** for PR 20564 at commit [`f1acb5a`](https://github.com/apache/spark/commit/f1acb5ad9a12df11faf082144c676082df647ff9). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20562: [SPARK-23275][SQL] fix the thread leaking in hive/tests
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/20562 Thanks! Merged to master/2.3 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20564: [SPARK-23378][SQL] move setCurrentDatabase from H...
GitHub user liufengdb opened a pull request: https://github.com/apache/spark/pull/20564 [SPARK-23378][SQL] move setCurrentDatabase from HiveExternalCatalog to HiveClientImpl ## What changes were proposed in this pull request? This enforces the rule that no calls from `HiveExternalCatalog` reset the current database in the hive client, except the `setCurrentDatabase` method. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/liufengdb/spark move Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20564.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20564 commit f1acb5ad9a12df11faf082144c676082df647ff9 Author: Feng LiuDate: 2018-02-10T00:18:44Z some --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20550: [MINOR][HIVE] Typo fixes
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20550 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20550: [MINOR][HIVE] Typo fixes
Github user srowen commented on the issue: https://github.com/apache/spark/pull/20550 Merged to master/2.3 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20550: [MINOR][HIVE] Typo fixes
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20550 **[Test build #4090 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4090/testReport)** for PR 20550 at commit [`dcbb4da`](https://github.com/apache/spark/commit/dcbb4da3b2501d74deb43df8b879b5e75154a51b). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20563: [SPARK-23186][SQL][BRANCH-2.2] Initialize DriverManager ...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/20563 cc @cloud-fan and @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable logical ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20387 **[Test build #87274 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87274/testReport)** for PR 20387 at commit [`ce5f40d`](https://github.com/apache/spark/commit/ce5f40d6a512874e2dd45bab9256f77ff74e628b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable logical ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20387 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable logical ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20387 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/760/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167374952 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,131 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ExprId} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, --- End diff -- I mentioned this below, but I should point it out on this thread, too: it is not correct to pass output to this relation. There's no guarantee that output will match the requested projection exactly, so in addition to the problem of leaking v2 details in the planner, this would make it easy to build a relation that doesn't correctly report its output. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20563: [SPARK-23186][SQL][BRANCH-2.2] Initialize DriverManager ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20563 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167374648 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,131 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ExprId} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes +} + } + + private lazy val v2Options: DataSourceOptions = { +// ensure path and table options are set correctly --- End diff -- Removing it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20563: [SPARK-23186][SQL][BRANCH-2.2] Initialize DriverManager ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20563 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87273/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167374579 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,131 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ExprId} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes --- End diff -- I don't think it is correct to make `output` a constructor parameter here. The v2 read API allows implementations to return rows with a different schema than the one requested, so you don't know whether the projection and the output will actually match until you push the projection and look at the schema the reader returns. If `output` were a constructor parameter, then it would be easy accidentally create instances where the `output` doesn't match the actual rows returned by the source. That's why the current code uses `projection` to pass the requested columns, and always sets `output` correctly. To make the guarantee that the column ids don't change, we don't strictly need `output` to be a constructor param. In fact, right now the only time this matters is when the projection isn't set. Otherwise, the ids are taken from the projection. I've considered a couple of options, like caching the conversion from schema to attributes, but I think the easiest option is to make sure that `projection` is always set. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20563: [SPARK-23186][SQL][BRANCH-2.2] Initialize DriverManager ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20563 **[Test build #87273 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87273/testReport)** for PR 20563 at commit [`aeb188c`](https://github.com/apache/spark/commit/aeb188cb05187ae61cb4084beae9234e9c4fb8f8). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20562: [SPARK-23275][SQL] fix the thread leaking in hive/tests
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20562 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87272/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20562: [SPARK-23275][SQL] fix the thread leaking in hive/tests
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20562 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20562: [SPARK-23275][SQL] fix the thread leaking in hive/tests
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20562 **[Test build #87272 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87272/testReport)** for PR 20562 at commit [`4e10f34`](https://github.com/apache/spark/commit/4e10f34ebb17940a1f0fd54ce26cb16d06320770). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20387#discussion_r167369570 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -17,17 +17,131 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ExprId} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, SupportsPushDownCatalystFilters, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsReportStatistics} +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType case class DataSourceV2Relation( -output: Seq[AttributeReference], -reader: DataSourceReader) - extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder { +source: DataSourceV2, +options: Map[String, String], +projection: Option[Seq[AttributeReference]] = None, +filters: Option[Seq[Expression]] = None, +userSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { + + override def simpleString: String = { +s"DataSourceV2Relation(source=$sourceName, " + + s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " + + s"filters=[${pushedFilters.mkString(", ")}], options=$options)" + } + + override lazy val schema: StructType = reader.readSchema() + + override lazy val output: Seq[AttributeReference] = { +projection match { + case Some(attrs) => +// use the projection attributes to avoid assigning new ids. fields that are not projected +// will be assigned new ids, which is okay because they are not projected. +val attrMap = attrs.map(a => a.name -> a).toMap +schema.map(f => attrMap.getOrElse(f.name, + AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())) + case _ => +schema.toAttributes +} + } + + private lazy val v2Options: DataSourceOptions = { +// ensure path and table options are set correctly +val updatedOptions = new mutable.HashMap[String, String] +updatedOptions ++= options + +new DataSourceOptions(options.asJava) + } + + private val sourceName: String = { +source match { + case registered: DataSourceRegister => +registered.shortName() + case _ => +source.getClass.getSimpleName +} + } + + lazy val ( + reader: DataSourceReader, + unsupportedFilters: Seq[Expression], + pushedFilters: Seq[Expression]) = { +val newReader = userSchema match { + case Some(s) => +asReadSupportWithSchema.createReader(s, v2Options) + case _ => +asReadSupport.createReader(v2Options) +} + +projection.foreach { attrs => + DataSourceV2Relation.pushRequiredColumns(newReader, attrs.toStructType) +} + +val (remainingFilters, pushedFilters) = filters match { + case Some(filterSeq) => +DataSourceV2Relation.pushFilters(newReader, filterSeq) + case _ => +(Nil, Nil) +} + +(newReader, remainingFilters, pushedFilters) + } + + private lazy val asReadSupport: ReadSupport = { --- End diff -- I think the implementation is pretty clear: ```java if (ds.isInstanceOf[ReadSupport] || ds.isInstanceOf[ReadSupportWithSchema]) { ... } else { loadV1Source(paths: _*) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20490 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87269/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20490 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20490 **[Test build #87269 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87269/testReport)** for PR 20490 at commit [`ec96856`](https://github.com/apache/spark/commit/ec968563605f961d3d874913de51265683a8c132). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20351: [SPARK-23014][SS] Fully remove V1 memory sink.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20351 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87271/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20351: [SPARK-23014][SS] Fully remove V1 memory sink.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20351 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20351: [SPARK-23014][SS] Fully remove V1 memory sink.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20351 **[Test build #87271 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87271/testReport)** for PR 20351 at commit [`efcf03d`](https://github.com/apache/spark/commit/efcf03d47bda6c9e0c797d7415d20ae2534db393). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class MemoryStreamDataReaderFactory(records: Array[UnsafeRow])` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20519: [Spark-23240][python] Don't let python site customizatio...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20519 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20519: [Spark-23240][python] Don't let python site customizatio...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20519 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87267/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...
Github user stoader commented on a diff in the pull request: https://github.com/apache/spark/pull/19775#discussion_r167359005 --- Diff: core/src/main/scala/org/apache/spark/metrics/sink/PrometheusSink.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import java.net.URI +import java.util +import java.util.Properties +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.util.Try + +import com.codahale.metrics._ +import io.prometheus.client.CollectorRegistry +import io.prometheus.client.dropwizard.DropwizardExports + +import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkEnv} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.METRICS_NAMESPACE +import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.metrics.prometheus.client.exporter.PushGatewayWithTimestamp + + +private[spark] class PrometheusSink( + val property: Properties, + val registry: MetricRegistry, + securityMgr: SecurityManager) + extends Sink with Logging { + + protected class Reporter(registry: MetricRegistry) +extends ScheduledReporter( + registry, + "prometheus-reporter", + MetricFilter.ALL, + TimeUnit.SECONDS, + TimeUnit.MILLISECONDS) { + +val defaultSparkConf: SparkConf = new SparkConf(true) + +override def report( + gauges: util.SortedMap[String, Gauge[_]], + counters: util.SortedMap[String, Counter], + histograms: util.SortedMap[String, Histogram], + meters: util.SortedMap[String, Meter], + timers: util.SortedMap[String, Timer]): Unit = { + + // SparkEnv may become available only after metrics sink creation thus retrieving + // SparkConf from spark env here and not during the creation/initialisation of PrometheusSink. + val sparkConf: SparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(defaultSparkConf) + + val metricsNamespace: Option[String] = sparkConf.get(METRICS_NAMESPACE) + val sparkAppId: Option[String] = sparkConf.getOption("spark.app.id") + val executorId: Option[String] = sparkConf.getOption("spark.executor.id") + + logInfo(s"metricsNamespace=$metricsNamespace, sparkAppId=$sparkAppId, " + +s"executorId=$executorId") + + val role: String = (sparkAppId, executorId) match { +case (Some(_), Some(SparkContext.DRIVER_IDENTIFIER)) => "driver" +case (Some(_), Some(_)) => "executor" +case _ => "shuffle" + } + + val job: String = role match { +case "driver" => metricsNamespace.getOrElse(sparkAppId.get) +case "executor" => metricsNamespace.getOrElse(sparkAppId.get) +case _ => metricsNamespace.getOrElse("shuffle") + } + logInfo(s"role=$role, job=$job") + + val groupingKey: Map[String, String] = (role, executorId) match { +case ("driver", _) => Map("role" -> role) +case ("executor", Some(id)) => Map ("role" -> role, "number" -> id) +case _ => Map("role" -> role) + } + + + pushGateway.pushAdd(pushRegistry, job, groupingKey.asJava, +s"${System.currentTimeMillis}") + +} + + } + + val DEFAULT_PUSH_PERIOD: Int = 10 + val DEFAULT_PUSH_PERIOD_UNIT: TimeUnit = TimeUnit.SECONDS + val DEFAULT_PUSHGATEWAY_ADDRESS: String = "127.0.0.1:9091" + val DEFAULT_PUSHGATEWAY_ADDRESS_PROTOCOL: String = "http" + + val KEY_PUSH_PERIOD = "period" + val KEY_PUSH_PERIOD_UNIT = "unit" + val KEY_PUSHGATEWAY_ADDRESS = "pushgateway-address" + val
[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...
Github user stoader commented on a diff in the pull request: https://github.com/apache/spark/pull/19775#discussion_r167359036 --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java --- @@ -0,0 +1,178 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics.prometheus.client.exporter; + +import java.io.IOException; +import java.io.Writer; +import java.util.Enumeration; + +import io.prometheus.client.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TextFormatWithTimestamp { +private static final Logger logger = LoggerFactory.getLogger(TextFormatWithTimestamp.class); + +/** + * Content-type for text version 0.0.4. + */ +public static final String CONTENT_TYPE_004 = "text/plain; version=0.0.4; charset=utf-8"; + +private static StringBuilder jsonMessageLogBuilder = new StringBuilder(); + +public static void write004(Writer writer, +Enumeration mfs)throws IOException { +write004(writer, mfs, null); +} + +/** + * Write out the text version 0.0.4 of the given MetricFamilySamples. + */ +public static void write004(Writer writer,Enumeration mfs, +String timestamp) throws IOException { +/* See http://prometheus.io/docs/instrumenting/exposition_formats/ + * for the output format specification. */ +while(mfs.hasMoreElements()) { +Collector.MetricFamilySamples metricFamilySamples = mfs.nextElement(); --- End diff -- This class has been refactored. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...
Github user stoader commented on a diff in the pull request: https://github.com/apache/spark/pull/19775#discussion_r167359067 --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/PushGatewayWithTimestamp.java --- @@ -0,0 +1,320 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics.prometheus.client.exporter; + +import io.prometheus.client.Collector; +import io.prometheus.client.CollectorRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.HttpURLConnection; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.net.URL; +import java.net.URLEncoder; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; + +/** + * Export metrics via the Prometheus Pushgateway. + * + * The Prometheus Pushgateway exists to allow ephemeral and + * batch jobs to expose their metrics to Prometheus. + * Since these kinds of jobs may not exist long enough to be scraped, + * they can instead push their metrics to a Pushgateway. + * This class allows pushing the contents of a {@link CollectorRegistry} to + * a Pushgateway. + * + * Example usage: + * + * {@code + * void executeBatchJob() throws Exception { + * CollectorRegistry registry = new CollectorRegistry(); + * Gauge duration = Gauge.build() + * .name("my_batch_job_duration_seconds") + * .help("Duration of my batch job in seconds.") + * .register(registry); + * Gauge.Timer durationTimer = duration.startTimer(); + * try { + * // Your code here. + * + * // This is only added to the registry after success, + * // so that a previous success in the Pushgateway isn't overwritten on failure. + * Gauge lastSuccess = Gauge.build() + * .name("my_batch_job_last_success") + * .help("Last time my batch job succeeded, in unixtime.") + * .register(registry); + * lastSuccess.setToCurrentTime(); + * } finally { + * durationTimer.setDuration(); + * PushGatewayWithTimestamp pg = new PushGatewayWithTimestamp("127.0.0.1:9091"); + * pg.pushAdd(registry, "my_batch_job"); + * } + * } + * } + * + * + * See https://github.com/prometheus/pushgateway;> + * https://github.com/prometheus/pushgateway + */ +public class PushGatewayWithTimestamp { + +private static final Logger logger = LoggerFactory.getLogger(PushGatewayWithTimestamp.class); +private final String address; +private static final int SECONDS_PER_MILLISECOND = 1000; +/** + * Construct a Pushgateway, with the given address. + * + * @param address host:port or ip:port of the Pushgateway. + */ +public PushGatewayWithTimestamp(String address) { +this.address = address; +} + +/** + * Pushes all metrics in a registry, + * replacing all those with the same job and no grouping key. + * + * This uses the PUT HTTP method. + */ +public void push(CollectorRegistry registry, String job) throws IOException { +doRequest(registry, job, null, "PUT", null); +} + +/** + * Pushes all metrics in a Collector, + * replacing all those with the same job and no grouping key. + * + * This is useful for pushing a single Gauge. + * + * This uses the PUT HTTP method. + */ +public void push(Collector collector, String job) throws IOException { +CollectorRegistry registry = new CollectorRegistry(); +collector.register(registry); +push(registry, job); +} + +/** + * Pushes all metrics in a registry, + * replacing all those with the same job and grouping key. + * + * This uses the PUT HTTP method. + */ +public void push(CollectorRegistry registry, + String job, Map
[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...
Github user stoader commented on a diff in the pull request: https://github.com/apache/spark/pull/19775#discussion_r167359075 --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/PushGatewayWithTimestamp.java --- @@ -0,0 +1,320 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics.prometheus.client.exporter; + +import io.prometheus.client.Collector; +import io.prometheus.client.CollectorRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.HttpURLConnection; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.net.URL; +import java.net.URLEncoder; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; + +/** + * Export metrics via the Prometheus Pushgateway. + * + * The Prometheus Pushgateway exists to allow ephemeral and + * batch jobs to expose their metrics to Prometheus. + * Since these kinds of jobs may not exist long enough to be scraped, + * they can instead push their metrics to a Pushgateway. + * This class allows pushing the contents of a {@link CollectorRegistry} to + * a Pushgateway. + * + * Example usage: + * + * {@code + * void executeBatchJob() throws Exception { + * CollectorRegistry registry = new CollectorRegistry(); + * Gauge duration = Gauge.build() + * .name("my_batch_job_duration_seconds") + * .help("Duration of my batch job in seconds.") + * .register(registry); + * Gauge.Timer durationTimer = duration.startTimer(); + * try { + * // Your code here. + * + * // This is only added to the registry after success, + * // so that a previous success in the Pushgateway isn't overwritten on failure. + * Gauge lastSuccess = Gauge.build() + * .name("my_batch_job_last_success") + * .help("Last time my batch job succeeded, in unixtime.") + * .register(registry); + * lastSuccess.setToCurrentTime(); + * } finally { + * durationTimer.setDuration(); + * PushGatewayWithTimestamp pg = new PushGatewayWithTimestamp("127.0.0.1:9091"); + * pg.pushAdd(registry, "my_batch_job"); + * } + * } + * } + * + * + * See https://github.com/prometheus/pushgateway;> + * https://github.com/prometheus/pushgateway + */ +public class PushGatewayWithTimestamp { + +private static final Logger logger = LoggerFactory.getLogger(PushGatewayWithTimestamp.class); +private final String address; +private static final int SECONDS_PER_MILLISECOND = 1000; +/** + * Construct a Pushgateway, with the given address. + * + * @param address host:port or ip:port of the Pushgateway. + */ +public PushGatewayWithTimestamp(String address) { +this.address = address; +} + +/** + * Pushes all metrics in a registry, + * replacing all those with the same job and no grouping key. + * + * This uses the PUT HTTP method. + */ +public void push(CollectorRegistry registry, String job) throws IOException { +doRequest(registry, job, null, "PUT", null); +} + +/** + * Pushes all metrics in a Collector, + * replacing all those with the same job and no grouping key. + * + * This is useful for pushing a single Gauge. + * + * This uses the PUT HTTP method. + */ +public void push(Collector collector, String job) throws IOException { +CollectorRegistry registry = new CollectorRegistry(); +collector.register(registry); +push(registry, job); +} + +/** + * Pushes all metrics in a registry, + * replacing all those with the same job and grouping key. + * + * This uses the PUT HTTP method. + */ +public void push(CollectorRegistry registry, + String job, Map
[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...
Github user stoader commented on a diff in the pull request: https://github.com/apache/spark/pull/19775#discussion_r167359048 --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java --- @@ -0,0 +1,178 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics.prometheus.client.exporter; + +import java.io.IOException; +import java.io.Writer; +import java.util.Enumeration; + +import io.prometheus.client.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TextFormatWithTimestamp { +private static final Logger logger = LoggerFactory.getLogger(TextFormatWithTimestamp.class); + +/** + * Content-type for text version 0.0.4. + */ +public static final String CONTENT_TYPE_004 = "text/plain; version=0.0.4; charset=utf-8"; + +private static StringBuilder jsonMessageLogBuilder = new StringBuilder(); --- End diff -- This class has been refactored. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...
Github user stoader commented on a diff in the pull request: https://github.com/apache/spark/pull/19775#discussion_r167359010 --- Diff: core/src/main/scala/org/apache/spark/metrics/sink/PrometheusSink.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import java.net.URI +import java.util +import java.util.Properties +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.util.Try + +import com.codahale.metrics._ +import io.prometheus.client.CollectorRegistry +import io.prometheus.client.dropwizard.DropwizardExports + +import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkEnv} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.METRICS_NAMESPACE +import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.metrics.prometheus.client.exporter.PushGatewayWithTimestamp + + +private[spark] class PrometheusSink( + val property: Properties, + val registry: MetricRegistry, + securityMgr: SecurityManager) + extends Sink with Logging { + + protected class Reporter(registry: MetricRegistry) +extends ScheduledReporter( + registry, + "prometheus-reporter", + MetricFilter.ALL, + TimeUnit.SECONDS, + TimeUnit.MILLISECONDS) { + +val defaultSparkConf: SparkConf = new SparkConf(true) + +override def report( + gauges: util.SortedMap[String, Gauge[_]], + counters: util.SortedMap[String, Counter], + histograms: util.SortedMap[String, Histogram], + meters: util.SortedMap[String, Meter], + timers: util.SortedMap[String, Timer]): Unit = { + + // SparkEnv may become available only after metrics sink creation thus retrieving + // SparkConf from spark env here and not during the creation/initialisation of PrometheusSink. + val sparkConf: SparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(defaultSparkConf) + + val metricsNamespace: Option[String] = sparkConf.get(METRICS_NAMESPACE) + val sparkAppId: Option[String] = sparkConf.getOption("spark.app.id") + val executorId: Option[String] = sparkConf.getOption("spark.executor.id") + + logInfo(s"metricsNamespace=$metricsNamespace, sparkAppId=$sparkAppId, " + +s"executorId=$executorId") + + val role: String = (sparkAppId, executorId) match { +case (Some(_), Some(SparkContext.DRIVER_IDENTIFIER)) => "driver" +case (Some(_), Some(_)) => "executor" +case _ => "shuffle" + } + + val job: String = role match { +case "driver" => metricsNamespace.getOrElse(sparkAppId.get) +case "executor" => metricsNamespace.getOrElse(sparkAppId.get) +case _ => metricsNamespace.getOrElse("shuffle") + } + logInfo(s"role=$role, job=$job") + + val groupingKey: Map[String, String] = (role, executorId) match { +case ("driver", _) => Map("role" -> role) +case ("executor", Some(id)) => Map ("role" -> role, "number" -> id) +case _ => Map("role" -> role) + } + + --- End diff -- Empty line removed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...
Github user stoader commented on a diff in the pull request: https://github.com/apache/spark/pull/19775#discussion_r167359085 --- Diff: core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/PushGatewayWithTimestamp.java --- @@ -0,0 +1,320 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics.prometheus.client.exporter; + +import io.prometheus.client.Collector; +import io.prometheus.client.CollectorRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.HttpURLConnection; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.net.URL; +import java.net.URLEncoder; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; + +/** + * Export metrics via the Prometheus Pushgateway. + * + * The Prometheus Pushgateway exists to allow ephemeral and + * batch jobs to expose their metrics to Prometheus. + * Since these kinds of jobs may not exist long enough to be scraped, + * they can instead push their metrics to a Pushgateway. + * This class allows pushing the contents of a {@link CollectorRegistry} to + * a Pushgateway. + * + * Example usage: + * + * {@code + * void executeBatchJob() throws Exception { + * CollectorRegistry registry = new CollectorRegistry(); + * Gauge duration = Gauge.build() + * .name("my_batch_job_duration_seconds") + * .help("Duration of my batch job in seconds.") + * .register(registry); + * Gauge.Timer durationTimer = duration.startTimer(); + * try { + * // Your code here. + * + * // This is only added to the registry after success, + * // so that a previous success in the Pushgateway isn't overwritten on failure. + * Gauge lastSuccess = Gauge.build() + * .name("my_batch_job_last_success") + * .help("Last time my batch job succeeded, in unixtime.") + * .register(registry); + * lastSuccess.setToCurrentTime(); + * } finally { + * durationTimer.setDuration(); + * PushGatewayWithTimestamp pg = new PushGatewayWithTimestamp("127.0.0.1:9091"); + * pg.pushAdd(registry, "my_batch_job"); + * } + * } + * } + * + * + * See https://github.com/prometheus/pushgateway;> + * https://github.com/prometheus/pushgateway + */ +public class PushGatewayWithTimestamp { + +private static final Logger logger = LoggerFactory.getLogger(PushGatewayWithTimestamp.class); +private final String address; +private static final int SECONDS_PER_MILLISECOND = 1000; +/** + * Construct a Pushgateway, with the given address. + * + * @param address host:port or ip:port of the Pushgateway. + */ +public PushGatewayWithTimestamp(String address) { +this.address = address; +} + +/** + * Pushes all metrics in a registry, + * replacing all those with the same job and no grouping key. + * + * This uses the PUT HTTP method. + */ +public void push(CollectorRegistry registry, String job) throws IOException { +doRequest(registry, job, null, "PUT", null); +} + +/** + * Pushes all metrics in a Collector, + * replacing all those with the same job and no grouping key. + * + * This is useful for pushing a single Gauge. + * + * This uses the PUT HTTP method. + */ +public void push(Collector collector, String job) throws IOException { +CollectorRegistry registry = new CollectorRegistry(); +collector.register(registry); +push(registry, job); +} + +/** + * Pushes all metrics in a registry, + * replacing all those with the same job and grouping key. + * + * This uses the PUT HTTP method. + */ +public void push(CollectorRegistry registry, + String job, Map