[GitHub] spark pull request #20407: [SPARK-23124][SQL] Allow to disable BroadcastNest...

2018-02-09 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20407#discussion_r167392027
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -156,6 +156,15 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val ALLOW_NESTEDJOIN_FALLBACK = 
buildConf("spark.sql.join.broadcastJoinFallback.enabled")
+.internal()
+.doc("When true (default), if the other options are not available, 
fallback to try and use " +
+  "BroadcastNestedLoopJoin as join strategy. This can cause OOM which 
can be a problem " +
+  "in some scenarios, eg. when running the thriftserver. Turn to false 
to disable it: an " +
+  "AnalysisException will be thrown.")
--- End diff --

why? May you please explain which would be the right direction then?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20519: [Spark-23240][python] Don't let python site customizatio...

2018-02-09 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/20519
  
I am not yet fully sure about how sitecustomize fills some bytes to stdout 
but I believe it doesn't always happen because at least we have it for Python 
coverage - 
https://github.com/apache/spark/blob/master/python/test_coverage/sitecustomize.py.

The only way I used to verify this PR and 
https://github.com/apache/spark/pull/20424 was to manually add some bytes to 
stdout.

So, I am still thinking it's a corner case .. Can we otherwise simply flush 
and remove the stdout right before starting the daemon?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20531: [SPARK-23352][PYTHON] Explicitly specify supported types...

2018-02-09 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20531
  
**[Test build #87280 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87280/testReport)**
 for PR 20531 at commit 
[`07f2d78`](https://github.com/apache/spark/commit/07f2d781022a44186fe5ab4c5621d80acce0711f).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20531: [SPARK-23352][PYTHON] Explicitly specify supported types...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20531
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20531: [SPARK-23352][PYTHON] Explicitly specify supported types...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20531
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/766/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20531: [SPARK-23352][PYTHON] Explicitly specify supporte...

2018-02-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20531#discussion_r167391819
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -4509,23 +4523,32 @@ def weighted_mean(v, w):
 return weighted_mean
 
 def test_manual(self):
+from pyspark.sql.functions import pandas_udf, array
+
 df = self.data
 sum_udf = self.pandas_agg_sum_udf
 mean_udf = self.pandas_agg_mean_udf
-
-result1 = df.groupby('id').agg(sum_udf(df.v), 
mean_udf(df.v)).sort('id')
+mean_arr_udf = pandas_udf(
+self.pandas_agg_mean_udf.func,
--- End diff --

Hm .. let's do it separately for type coercion stuff in another issue. I 
think we need another iteration for it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorter with ...

2018-02-09 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20561
  
**[Test build #87279 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87279/testReport)**
 for PR 20561 at commit 
[`8dab79a`](https://github.com/apache/spark/commit/8dab79a46d9201ec0e43e60d6ef841bf91f4c616).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorter with ...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20561
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/765/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorter with ...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20561
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorter with ...

2018-02-09 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/20561
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...

2018-02-09 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20555#discussion_r167391619
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -258,54 +262,43 @@ public int read(byte[] b, int offset, int len) throws 
IOException {
 if (len == 0) {
   return 0;
 }
-stateChangeLock.lock();
-try {
-  return readInternal(b, offset, len);
-} finally {
-  stateChangeLock.unlock();
-}
-  }
-
-  /**
-   * flip the active and read ahead buffer
-   */
-  private void swapBuffers() {
-ByteBuffer temp = activeBuffer;
-activeBuffer = readAheadBuffer;
-readAheadBuffer = temp;
-  }
 
-  /**
-   * Internal read function which should be called only from read() api. 
The assumption is that
-   * the stateChangeLock is already acquired in the caller before calling 
this function.
-   */
-  private int readInternal(byte[] b, int offset, int len) throws 
IOException {
-assert (stateChangeLock.isLocked());
 if (!activeBuffer.hasRemaining()) {
-  waitForAsyncReadComplete();
-  if (readAheadBuffer.hasRemaining()) {
-swapBuffers();
-  } else {
-// The first read or activeBuffer is skipped.
-readAsync();
+  // No remaining in active buffer - lock and switch to write ahead 
buffer.
+  stateChangeLock.lock();
+  try {
 waitForAsyncReadComplete();
-if (isEndOfStream()) {
-  return -1;
+if (!readAheadBuffer.hasRemaining()) {
+  // The first read or activeBuffer is skipped.
+  readAsync();
+  waitForAsyncReadComplete();
+  if (isEndOfStream()) {
+return -1;
+  }
 }
+// Swap the newly read read ahead buffer in place of empty active 
buffer.
--- End diff --

Is it good to use `read-ahead` in comments for ease of reading?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20559: [SPARK-23360][SQL][PYTHON] Get local timezone fro...

2018-02-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20559#discussion_r167391496
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -2867,6 +2867,35 @@ def 
test_create_dataframe_required_pandas_not_found(self):
 "d": [pd.Timestamp.now().date()]})
 self.spark.createDataFrame(pdf)
 
+# Regression test for SPARK-23360
+@unittest.skipIf(not _have_pandas, _pandas_requirement_message)
+def test_create_dateframe_from_pandas_with_dst(self):
+import pandas as pd
+from datetime import datetime
+
+pdf = pd.DataFrame({'time': [datetime(2015, 10, 31, 22, 30)]})
+
+df = self.spark.createDataFrame(pdf)
+self.assertPandasEqual(pdf, df.toPandas())
+
+orig_env_tz = os.environ.get('TZ', None)
+orig_session_tz = self.spark.conf.get('spark.sql.session.timeZone')
+try:
+tz = 'America/Los_Angeles'
+os.environ['TZ'] = tz
+time.tzset()
+self.spark.conf.set('spark.sql.session.timeZone', tz)
+
+df = self.spark.createDataFrame(pdf)
+df.show()
--- End diff --

gentle reminder for it. Seems it's now there for debugging purpose I guess? 
:).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20559: [SPARK-23360][SQL][PYTHON] Get local timezone fro...

2018-02-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20559#discussion_r167391409
  
--- Diff: python/pyspark/sql/types.py ---
@@ -1766,15 +1781,13 @@ def _check_series_convert_timestamps_localize(s, 
from_timezone, to_timezone):
 
 import pandas as pd
 from pandas.api.types import is_datetime64tz_dtype, is_datetime64_dtype
-from_tz = from_timezone or 'tzlocal()'
-to_tz = to_timezone or 'tzlocal()'
+from_tz = from_timezone or _get_local_timezone()
+to_tz = to_timezone or _get_local_timezone()
 # TODO: handle nested timestamps, such as ArrayType(TimestampType())?
 if is_datetime64tz_dtype(s.dtype):
 return s.dt.tz_convert(to_tz).dt.tz_localize(None)
 elif is_datetime64_dtype(s.dtype) and from_tz != to_tz:
-# `s.dt.tz_localize('tzlocal()')` doesn't work properly when 
including NaT.
-return s.apply(lambda ts: 
ts.tz_localize(from_tz).tz_convert(to_tz).tz_localize(None)
-   if ts is not pd.NaT else pd.NaT)
+return 
s.dt.tz_localize(from_tz).dt.tz_convert(to_tz).dt.tz_localize(None)
--- End diff --

@ueshin, is it safe to remove `if ts is not pd.NaT else pd.NaT`? Seems 
there is a small possibility for `tzlocal()`:

https://github.com/pandas-dev/pandas/blob/0.19.x/pandas/tslib.pyx#L1760
https://github.com/pandas-dev/pandas/blob/0.19.x/pandas/tslib.pyx#L54
https://github.com/dateutil/dateutil/blob/2.6.1/dateutil/tz/tz.py#L1362
https://github.com/dateutil/dateutil/blob/2.6.1/dateutil/tz/tz.py#L1408


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...

2018-02-09 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20555#discussion_r167391309
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -232,7 +229,9 @@ private void waitForAsyncReadComplete() throws 
IOException {
 stateChangeLock.lock();
 try {
   while (readInProgress) {
+isWaiting.set(true);
 asyncReadComplete.await();
+isWaiting.set(false);
--- End diff --

What happens if `await()` throws an exception? Is it ok not to update 
`isWaiting`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20559: [WIP][SPARK-23360][SQL][PYTHON] Get local timezone from ...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20559
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87278/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20559: [WIP][SPARK-23360][SQL][PYTHON] Get local timezone from ...

2018-02-09 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20559
  
**[Test build #87278 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87278/testReport)**
 for PR 20559 at commit 
[`e20e9fd`](https://github.com/apache/spark/commit/e20e9fdba7fd2a4db059fe8016f0a0a60f3dd71d).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20559: [WIP][SPARK-23360][SQL][PYTHON] Get local timezone from ...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20559
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20559: [WIP][SPARK-23360][SQL][PYTHON] Get local timezone from ...

2018-02-09 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20559
  
**[Test build #87278 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87278/testReport)**
 for PR 20559 at commit 
[`e20e9fd`](https://github.com/apache/spark/commit/e20e9fdba7fd2a4db059fe8016f0a0a60f3dd71d).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20559: [WIP][SPARK-23360][SQL][PYTHON] Get local timezone from ...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20559
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/764/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20559: [WIP][SPARK-23360][SQL][PYTHON] Get local timezone from ...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20559
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20559: [WIP][SPARK-23360][SQL][PYTHON] Get local timezone from ...

2018-02-09 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/20559
  
@BryanCutler Seems like pandas handles `tzlocal()` differently than other 
timezone, and it might handle DST incorrectly, I guess.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20559: [WIP][SPARK-23360][SQL][PYTHON] Get local timezon...

2018-02-09 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20559#discussion_r167390815
  
--- Diff: python/pyspark/sql/types.py ---
@@ -1709,6 +1709,15 @@ def _check_dataframe_convert_date(pdf, schema):
 return pdf
 
 
+def _get_local_timezone():
+""" Get local timezone from environment vi pytz, or dateutil. """
--- End diff --

I modified and added the comment. Can you understand by the comment?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorter with ...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20561
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorter with ...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20561
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87277/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorter with ...

2018-02-09 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20561
  
**[Test build #87277 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87277/testReport)**
 for PR 20561 at commit 
[`8dab79a`](https://github.com/apache/spark/commit/8dab79a46d9201ec0e43e60d6ef841bf91f4c616).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20559: [WIP][SPARK-23360][SQL][PYTHON] Get local timezon...

2018-02-09 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20559#discussion_r167389708
  
--- Diff: python/pyspark/sql/types.py ---
@@ -1709,6 +1709,15 @@ def _check_dataframe_convert_date(pdf, schema):
 return pdf
 
 
+def _get_local_timezone():
+""" Get local timezone from environment vi pytz, or dateutil. """
+from pyspark.sql.utils import require_minimum_pandas_version
+require_minimum_pandas_version()
--- End diff --

Actually, it isn't needed here. I'll remove it. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20559: [WIP][SPARK-23360][SQL][PYTHON] Get local timezon...

2018-02-09 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20559#discussion_r167389710
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -4124,7 +4126,7 @@ def test_vectorized_udf_timestamps(self):
 data = [(0, datetime(1969, 1, 1, 1, 1, 1)),
 (1, datetime(2012, 2, 2, 2, 2, 2)),
 (2, None),
-(3, datetime(2100, 3, 3, 3, 3, 3))]
+(3, datetime(2100, 4, 4, 4, 4, 4))]
--- End diff --

I'll revert it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20559: [WIP][SPARK-23360][SQL][PYTHON] Get local timezon...

2018-02-09 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20559#discussion_r167389709
  
--- Diff: python/pyspark/sql/types.py ---
@@ -1709,6 +1709,15 @@ def _check_dataframe_convert_date(pdf, schema):
 return pdf
 
 
+def _get_local_timezone():
+""" Get local timezone from environment vi pytz, or dateutil. """
+from pyspark.sql.utils import require_minimum_pandas_version
+require_minimum_pandas_version()
+
+import os
+return os.environ.get('TZ', 'dateutil/:')
--- End diff --

Sure, I'll add some comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19788
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20563: [SPARK-23186][SQL][BRANCH-2.2] Initialize DriverM...

2018-02-09 Thread dongjoon-hyun
Github user dongjoon-hyun closed the pull request at:

https://github.com/apache/spark/pull/20563


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20563: [SPARK-23186][SQL][BRANCH-2.2] Initialize DriverManager ...

2018-02-09 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/20563
  
Thank you so much!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20565: SPAR[SPARK-23379][SQL] remove redundant metastore access

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20565
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20565: SPAR[SPARK-23379][SQL] remove redundant metastore access

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20565
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87276/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20565: SPAR[SPARK-23379][SQL] remove redundant metastore access

2018-02-09 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20565
  
**[Test build #87276 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87276/testReport)**
 for PR 20565 at commit 
[`71df831`](https://github.com/apache/spark/commit/71df831d9da133bfc3881c278632ec932e762924).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorter with ...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20561
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/763/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorter with ...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20561
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorter with ...

2018-02-09 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20561
  
**[Test build #87277 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87277/testReport)**
 for PR 20561 at commit 
[`8dab79a`](https://github.com/apache/spark/commit/8dab79a46d9201ec0e43e60d6ef841bf91f4c616).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorte...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20561#discussion_r167387192
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
 ---
@@ -98,10 +99,20 @@ public UnsafeKVExternalSorter(
 numElementsForSpillThreshold,
 canUseRadixSort);
 } else {
-  // The array will be used to do in-place sort, which require half of 
the space to be empty.
-  // Note: each record in the map takes two entries in the array, one 
is record pointer,
-  // another is the key prefix.
-  assert(map.numKeys() * 2 <= map.getArray().size() / 2);
+  // `BytesToBytesMap`'s point array is only guaranteed to hold all 
the distinct keys, but
+  // `UnsafeInMemorySorter`'s point array need to hold all the 
entries. Since `BytesToBytesMap`
+  // can have duplicated keys, here we need a check to make sure the 
point array can hold
+  // all the entries in `BytesToBytesMap`.
+  final LongArray pointArray;
+  // The point array will be used to do in-place sort, which require 
half of the space to be
+  // empty. Note: each record in the map takes two entries in the 
point array, one is record
+  // pointer, another is the key prefix.
+  if (map.numValues() > map.getArray().size() / 4) {
+pointArray = map.allocateArray(map.numValues() * 4);
--- End diff --

`map.allocateArray` will trigger other consumers to spill is memory is not 
enough. If the allocation still fails, there is nothing we can do, just let the 
execution fail.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorte...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20561#discussion_r167387138
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
 ---
@@ -205,4 +206,42 @@ class UnsafeKVExternalSorterSuite extends 
SparkFunSuite with SharedSQLContext {
   spill = true
 )
   }
+
+  test("SPARK-23376: Create UnsafeKVExternalSorter with BytesToByteMap 
having duplicated keys") {
+val memoryManager = new TestMemoryManager(new SparkConf())
+val taskMemoryManager = new TaskMemoryManager(memoryManager, 0)
+val map = new BytesToBytesMap(taskMemoryManager, 64, 
taskMemoryManager.pageSizeBytes())
+
+// Key/value are a unsafe rows with a single int column
+val schema = new StructType().add("i", IntegerType)
+val key = new UnsafeRow(1)
+key.pointTo(new Array[Byte](32), 32)
+key.setInt(0, 1)
+val value = new UnsafeRow(1)
+value.pointTo(new Array[Byte](32), 32)
+value.setInt(0, 2)
+
+for (_ <- 1 to 65) {
+  val loc = map.lookup(key.getBaseObject, key.getBaseOffset, 
key.getSizeInBytes)
+  loc.append(
+key.getBaseObject, key.getBaseOffset, key.getSizeInBytes,
+value.getBaseObject, value.getBaseOffset, value.getSizeInBytes)
+}
+
+// Make sure we can successfully create a UnsafeKVExternalSorter with 
a `BytesToBytesMap`
+// which has duplicated keys and the number of entries exceeds its 
capacity.
--- End diff --

yes, we use `BytesToBytesMap` to build the broadcast join hash relation, 
which may have duplicated keys. I only create a new pointer array if the 
existing one is not big enough, so we won't have performance regression for 
aggregate.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20561: [SPARK-23376][SQL] creating UnsafeKVExternalSorte...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20561#discussion_r167387073
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
 ---
@@ -98,10 +99,20 @@ public UnsafeKVExternalSorter(
 numElementsForSpillThreshold,
 canUseRadixSort);
 } else {
-  // The array will be used to do in-place sort, which require half of 
the space to be empty.
-  // Note: each record in the map takes two entries in the array, one 
is record pointer,
-  // another is the key prefix.
-  assert(map.numKeys() * 2 <= map.getArray().size() / 2);
+  // `BytesToBytesMap`'s point array is only guaranteed to hold all 
the distinct keys, but
+  // `UnsafeInMemorySorter`'s point array need to hold all the 
entries. Since `BytesToBytesMap`
--- End diff --

yea, but it's not trivial, I'd like to do it later. The required change I 
can think of: `BytesToBytesMap` is actually a key -> list, and we need 
to provide a way to iterate key -> list instead of key -> value.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/20490
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r167386197
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -116,21 +118,45 @@ object DataWritingSparkTask extends Logging {
   def run(
   writeTask: DataWriterFactory[InternalRow],
   context: TaskContext,
-  iter: Iterator[InternalRow]): WriterCommitMessage = {
-val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
+  iter: Iterator[InternalRow],
+  useCommitCoordinator: Boolean): WriterCommitMessage = {
+val stageId = context.stageId()
+val partId = context.partitionId()
+val attemptId = context.attemptNumber()
+val dataWriter = writeTask.createDataWriter(partId, attemptId)
 
 // write the data and commit this writer.
 Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
   iter.foreach(dataWriter.write)
-  logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
-  val msg = dataWriter.commit()
-  logInfo(s"Writer for partition ${context.partitionId()} committed.")
+
+  val msg = if (useCommitCoordinator) {
+val coordinator = SparkEnv.get.outputCommitCoordinator
+val commitAuthorized = coordinator.canCommit(context.stageId(), 
partId, attemptId)
+if (commitAuthorized) {
+  logInfo(s"Writer for stage $stageId, task $partId.$attemptId is 
authorized to commit.")
+  dataWriter.commit()
+
+} else {
+  val message = s"Stage $stageId, task $partId.$attemptId: driver 
did not authorize commit"
--- End diff --

`authorize to commit`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r167386169
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -116,21 +118,45 @@ object DataWritingSparkTask extends Logging {
   def run(
   writeTask: DataWriterFactory[InternalRow],
   context: TaskContext,
-  iter: Iterator[InternalRow]): WriterCommitMessage = {
-val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
+  iter: Iterator[InternalRow],
+  useCommitCoordinator: Boolean): WriterCommitMessage = {
+val stageId = context.stageId()
+val partId = context.partitionId()
+val attemptId = context.attemptNumber()
+val dataWriter = writeTask.createDataWriter(partId, attemptId)
 
 // write the data and commit this writer.
 Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
   iter.foreach(dataWriter.write)
-  logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
-  val msg = dataWriter.commit()
-  logInfo(s"Writer for partition ${context.partitionId()} committed.")
+
+  val msg = if (useCommitCoordinator) {
+val coordinator = SparkEnv.get.outputCommitCoordinator
+val commitAuthorized = coordinator.canCommit(context.stageId(), 
partId, attemptId)
+if (commitAuthorized) {
+  logInfo(s"Writer for stage $stageId, task $partId.$attemptId is 
authorized to commit.")
+  dataWriter.commit()
+
--- End diff --

nit: remove unnecessary blank line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r167386125
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 ---
@@ -78,10 +88,11 @@ default void onDataWriterCommit(WriterCommitMessage 
message) {}
* failed, and {@link #abort(WriterCommitMessage[])} would be called. 
The state of the destination
* is undefined and @{@link #abort(WriterCommitMessage[])} may not be 
able to deal with it.
*
-   * Note that, one partition may have multiple committed data writers 
because of speculative tasks.
-   * Spark will pick the first successful one and get its commit message. 
Implementations should be
-   * aware of this and handle it correctly, e.g., have a coordinator to 
make sure only one data
-   * writer can commit, or have a way to clean up the data of 
already-committed writers.
+   * Note that speculative execution may cause multiple tasks to run for a 
partition. By default,
+   * Spark uses the commit coordinator to allow only one attempt to 
commit. Implementations can
+   * disable this behavior by overriding {@link #useCommitCoordinator()}. 
If disabled, multiple
+   * attempts may have committed successfully and all successful commit 
messages are passed to this
--- End diff --

`... committed successfully, and  Spark will pick the commit message that 
arrives at driver side first.`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r167386061
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 ---
@@ -78,10 +88,11 @@ default void onDataWriterCommit(WriterCommitMessage 
message) {}
* failed, and {@link #abort(WriterCommitMessage[])} would be called. 
The state of the destination
* is undefined and @{@link #abort(WriterCommitMessage[])} may not be 
able to deal with it.
*
-   * Note that, one partition may have multiple committed data writers 
because of speculative tasks.
-   * Spark will pick the first successful one and get its commit message. 
Implementations should be
-   * aware of this and handle it correctly, e.g., have a coordinator to 
make sure only one data
-   * writer can commit, or have a way to clean up the data of 
already-committed writers.
+   * Note that speculative execution may cause multiple tasks to run for a 
partition. By default,
+   * Spark uses the commit coordinator to allow only one attempt to 
commit. Implementations can
--- End diff --

nit: `only one` -> `at most one`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20563: [SPARK-23186][SQL][BRANCH-2.2] Initialize DriverManager ...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/20563
  
thanks, merging to 2.2!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167385858
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -37,22 +100,129 @@ case class DataSourceV2Relation(
   }
 
   override def newInstance(): DataSourceV2Relation = {
-copy(output = output.map(_.newInstance()))
+// projection is used to maintain id assignment.
+// if projection is not set, use output so the copy is not equal to 
the original
+copy(projection = projection.map(_.newInstance()))
   }
 }
 
 /**
  * A specialization of DataSourceV2Relation with the streaming bit set to 
true. Otherwise identical
  * to the non-streaming relation.
  */
-class StreamingDataSourceV2Relation(
+case class StreamingDataSourceV2Relation(
 output: Seq[AttributeReference],
-reader: DataSourceReader) extends DataSourceV2Relation(output, reader) 
{
+reader: DataSourceReader)
+extends LeafNode with DataSourceReaderHolder with 
MultiInstanceRelation {
   override def isStreaming: Boolean = true
+
+  override def canEqual(other: Any): Boolean = 
other.isInstanceOf[StreamingDataSourceV2Relation]
+
+  override def newInstance(): LogicalPlan = copy(output = 
output.map(_.newInstance()))
 }
 
 object DataSourceV2Relation {
-  def apply(reader: DataSourceReader): DataSourceV2Relation = {
-new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
+  private implicit class SourceHelpers(source: DataSourceV2) {
+def asReadSupport: ReadSupport = {
+  source match {
+case support: ReadSupport =>
+  support
+case _: ReadSupportWithSchema =>
+  // this method is only called if there is no user-supplied 
schema. if there is no
+  // user-supplied schema and ReadSupport was not implemented, 
throw a helpful exception.
+  throw new AnalysisException(s"Data source requires a 
user-supplied schema: $name")
+case _ =>
+  throw new AnalysisException(s"Data source is not readable: 
$name")
+  }
+}
+
+def asReadSupportWithSchema: ReadSupportWithSchema = {
+  source match {
+case support: ReadSupportWithSchema =>
+  support
+case _: ReadSupport =>
--- End diff --

another concern is: this check should be done ASAP so that we can fail 
earlier.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20564: [SPARK-23378][SQL] move setCurrentDatabase from HiveExte...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20564
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87275/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20564: [SPARK-23378][SQL] move setCurrentDatabase from HiveExte...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20564
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167385829
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -37,22 +100,129 @@ case class DataSourceV2Relation(
   }
 
   override def newInstance(): DataSourceV2Relation = {
-copy(output = output.map(_.newInstance()))
+// projection is used to maintain id assignment.
+// if projection is not set, use output so the copy is not equal to 
the original
+copy(projection = projection.map(_.newInstance()))
   }
 }
 
 /**
  * A specialization of DataSourceV2Relation with the streaming bit set to 
true. Otherwise identical
  * to the non-streaming relation.
  */
-class StreamingDataSourceV2Relation(
+case class StreamingDataSourceV2Relation(
 output: Seq[AttributeReference],
-reader: DataSourceReader) extends DataSourceV2Relation(output, reader) 
{
+reader: DataSourceReader)
+extends LeafNode with DataSourceReaderHolder with 
MultiInstanceRelation {
   override def isStreaming: Boolean = true
+
+  override def canEqual(other: Any): Boolean = 
other.isInstanceOf[StreamingDataSourceV2Relation]
+
+  override def newInstance(): LogicalPlan = copy(output = 
output.map(_.newInstance()))
 }
 
 object DataSourceV2Relation {
-  def apply(reader: DataSourceReader): DataSourceV2Relation = {
-new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
+  private implicit class SourceHelpers(source: DataSourceV2) {
+def asReadSupport: ReadSupport = {
+  source match {
+case support: ReadSupport =>
+  support
+case _: ReadSupportWithSchema =>
+  // this method is only called if there is no user-supplied 
schema. if there is no
+  // user-supplied schema and ReadSupport was not implemented, 
throw a helpful exception.
+  throw new AnalysisException(s"Data source requires a 
user-supplied schema: $name")
+case _ =>
+  throw new AnalysisException(s"Data source is not readable: 
$name")
+  }
+}
+
+def asReadSupportWithSchema: ReadSupportWithSchema = {
+  source match {
+case support: ReadSupportWithSchema =>
+  support
+case _: ReadSupport =>
--- End diff --

This is different from before: see 
https://github.com/apache/spark/pull/20387/files#diff-f70bda59304588cc3abfa3a9840653f4L214

Even if `userSpecifiedSchema` is not none, it's still allowed to have 
`ReadSupport`, as long as its reader's schema is same as the user specified 
schema.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20564: [SPARK-23378][SQL] move setCurrentDatabase from HiveExte...

2018-02-09 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20564
  
**[Test build #87275 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87275/testReport)**
 for PR 20564 at commit 
[`f1acb5a`](https://github.com/apache/spark/commit/f1acb5ad9a12df11faf082144c676082df647ff9).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167385722
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,80 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Seq[AttributeReference],
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
--- End diff --

nit: may be more clear to call it `userSpecifiedSchema`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable logical ...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20387
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87274/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable logical ...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20387
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable logical ...

2018-02-09 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20387
  
**[Test build #87274 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87274/testReport)**
 for PR 20387 at commit 
[`ce5f40d`](https://github.com/apache/spark/commit/ce5f40d6a512874e2dd45bab9256f77ff74e628b).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20565: SPAR[SPARK-23379][SQL] remove redundant metastore access

2018-02-09 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20565
  
**[Test build #87276 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87276/testReport)**
 for PR 20565 at commit 
[`71df831`](https://github.com/apache/spark/commit/71df831d9da133bfc3881c278632ec932e762924).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20565: SPAR[SPARK-23379][SQL] remove redundant metastore access

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20565
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/762/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20565: SPAR[SPARK-23379][SQL] remove redundant metastore access

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20565
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20565: SPAR[SPARK-23379][SQL] remove redundant metastore...

2018-02-09 Thread liufengdb
GitHub user liufengdb opened a pull request:

https://github.com/apache/spark/pull/20565

SPAR[SPARK-23379][SQL] remove redundant metastore access

## What changes were proposed in this pull request?

If the target database name is as same as the current database, we should 
be able to skip one metastore access. 

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/liufengdb/spark remove-redundant

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20565.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20565


commit f1acb5ad9a12df11faf082144c676082df647ff9
Author: Feng Liu 
Date:   2018-02-10T00:18:44Z

some

commit c29aa3ecb78dc345bb2ad1e5cbaf29d3fdb3a803
Author: Feng Liu 
Date:   2018-02-10T01:10:53Z

init




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20564: [SPARK-23378][SQL] move setCurrentDatabase from H...

2018-02-09 Thread liufengdb
Github user liufengdb commented on a diff in the pull request:

https://github.com/apache/spark/pull/20564#discussion_r167381054
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
@@ -1107,11 +1107,6 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
   }
 }
 
-// Note: Before altering table partitions in Hive, you *must* set the 
current database
-// to the one that contains the table of interest. Otherwise you will 
end up with the
-// most helpful error message ever: "Unable to alter partition. alter 
is not possible."
-// See HIVE-2742 for more detail.
-client.setCurrentDatabase(db)
--- End diff --

Sorry, I meant `a special case`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20564: [SPARK-23378][SQL] move setCurrentDatabase from H...

2018-02-09 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/20564#discussion_r167380287
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
@@ -1107,11 +1107,6 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
   }
 }
 
-// Note: Before altering table partitions in Hive, you *must* set the 
current database
-// to the one that contains the table of interest. Otherwise you will 
end up with the
-// most helpful error message ever: "Unable to alter partition. alter 
is not possible."
-// See HIVE-2742 for more detail.
-client.setCurrentDatabase(db)
--- End diff --

Can we have a test case for the exception?
> This removes the exception that some calls from HiveExternalCatalog can 
reset the current database in the hive client as a side effect.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20407: [SPARK-23124][SQL] Allow to disable BroadcastNest...

2018-02-09 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20407#discussion_r167378911
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -156,6 +156,15 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
+  val ALLOW_NESTEDJOIN_FALLBACK = 
buildConf("spark.sql.join.broadcastJoinFallback.enabled")
+.internal()
+.doc("When true (default), if the other options are not available, 
fallback to try and use " +
+  "BroadcastNestedLoopJoin as join strategy. This can cause OOM which 
can be a problem " +
+  "in some scenarios, eg. when running the thriftserver. Turn to false 
to disable it: an " +
+  "AnalysisException will be thrown.")
--- End diff --

I do not think this can resolve the issue. The fix is in a wrong direction.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20564: [SPARK-23378][SQL] move setCurrentDatabase from HiveExte...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20564
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/761/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20564: [SPARK-23378][SQL] move setCurrentDatabase from HiveExte...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20564
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20562: [SPARK-23275][SQL] fix the thread leaking in hive...

2018-02-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20562


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20564: [SPARK-23378][SQL] move setCurrentDatabase from HiveExte...

2018-02-09 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20564
  
**[Test build #87275 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87275/testReport)**
 for PR 20564 at commit 
[`f1acb5a`](https://github.com/apache/spark/commit/f1acb5ad9a12df11faf082144c676082df647ff9).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20562: [SPARK-23275][SQL] fix the thread leaking in hive/tests

2018-02-09 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/20562
  
Thanks! Merged to master/2.3


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20564: [SPARK-23378][SQL] move setCurrentDatabase from H...

2018-02-09 Thread liufengdb
GitHub user liufengdb opened a pull request:

https://github.com/apache/spark/pull/20564

[SPARK-23378][SQL] move setCurrentDatabase from HiveExternalCatalog to 
HiveClientImpl

## What changes were proposed in this pull request?

This enforces the rule that no calls from `HiveExternalCatalog` reset the 
current database in the hive client, except the `setCurrentDatabase` method. 

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/liufengdb/spark move

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20564.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20564


commit f1acb5ad9a12df11faf082144c676082df647ff9
Author: Feng Liu 
Date:   2018-02-10T00:18:44Z

some




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20550: [MINOR][HIVE] Typo fixes

2018-02-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20550


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20550: [MINOR][HIVE] Typo fixes

2018-02-09 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/20550
  
Merged to master/2.3


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20550: [MINOR][HIVE] Typo fixes

2018-02-09 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20550
  
**[Test build #4090 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4090/testReport)**
 for PR 20550 at commit 
[`dcbb4da`](https://github.com/apache/spark/commit/dcbb4da3b2501d74deb43df8b879b5e75154a51b).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20563: [SPARK-23186][SQL][BRANCH-2.2] Initialize DriverManager ...

2018-02-09 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/20563
  
cc @cloud-fan and @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable logical ...

2018-02-09 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20387
  
**[Test build #87274 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87274/testReport)**
 for PR 20387 at commit 
[`ce5f40d`](https://github.com/apache/spark/commit/ce5f40d6a512874e2dd45bab9256f77ff74e628b).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable logical ...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20387
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable logical ...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20387
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/760/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167374952
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,131 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
--- End diff --

I mentioned this below, but I should point it out on this thread, too: it 
is not correct to pass output to this relation. There's no guarantee that 
output will match the requested projection exactly, so in addition to the 
problem of leaking v2 details in the planner, this would make it easy to build 
a relation that doesn't correctly report its output.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20563: [SPARK-23186][SQL][BRANCH-2.2] Initialize DriverManager ...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20563
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167374648
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,131 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
+}
+  }
+
+  private lazy val v2Options: DataSourceOptions = {
+// ensure path and table options are set correctly
--- End diff --

Removing it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20563: [SPARK-23186][SQL][BRANCH-2.2] Initialize DriverManager ...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20563
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87273/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167374579
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,131 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
--- End diff --

I don't think it is correct to make `output` a constructor parameter here. 
The v2 read API allows implementations to return rows with a different schema 
than the one requested, so you don't know whether the projection and the output 
will actually match until you push the projection and look at the schema the 
reader returns.

If `output` were a constructor parameter, then it would be easy 
accidentally create instances where the `output` doesn't match the actual rows 
returned by the source. That's why the current code uses `projection` to pass 
the requested columns, and always sets `output` correctly.

To make the guarantee that the column ids don't change, we don't strictly 
need `output` to be a constructor param. In fact, right now the only time this 
matters is when the projection isn't set. Otherwise, the ids are taken from the 
projection. I've considered a couple of options, like caching the conversion 
from schema to attributes, but I think the easiest option is to make sure that 
`projection` is always set.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20563: [SPARK-23186][SQL][BRANCH-2.2] Initialize DriverManager ...

2018-02-09 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20563
  
**[Test build #87273 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87273/testReport)**
 for PR 20563 at commit 
[`aeb188c`](https://github.com/apache/spark/commit/aeb188cb05187ae61cb4084beae9234e9c4fb8f8).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20562: [SPARK-23275][SQL] fix the thread leaking in hive/tests

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20562
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87272/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20562: [SPARK-23275][SQL] fix the thread leaking in hive/tests

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20562
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20562: [SPARK-23275][SQL] fix the thread leaking in hive/tests

2018-02-09 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20562
  
**[Test build #87272 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87272/testReport)**
 for PR 20562 at commit 
[`4e10f34`](https://github.com/apache/spark/commit/4e10f34ebb17940a1f0fd54ce26cb16d06320770).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20387: [SPARK-23203][SQL]: DataSourceV2: Use immutable l...

2018-02-09 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20387#discussion_r167369570
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -17,17 +17,131 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
-import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, ReadSupportWithSchema, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
+import org.apache.spark.sql.types.StructType
 
 case class DataSourceV2Relation(
-output: Seq[AttributeReference],
-reader: DataSourceReader)
-  extends LeafNode with MultiInstanceRelation with DataSourceReaderHolder {
+source: DataSourceV2,
+options: Map[String, String],
+projection: Option[Seq[AttributeReference]] = None,
+filters: Option[Seq[Expression]] = None,
+userSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+
+  override def simpleString: String = {
+s"DataSourceV2Relation(source=$sourceName, " +
+  s"schema=[${output.map(a => s"$a 
${a.dataType.simpleString}").mkString(", ")}], " +
+  s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
+  }
+
+  override lazy val schema: StructType = reader.readSchema()
+
+  override lazy val output: Seq[AttributeReference] = {
+projection match {
+  case Some(attrs) =>
+// use the projection attributes to avoid assigning new ids. 
fields that are not projected
+// will be assigned new ids, which is okay because they are not 
projected.
+val attrMap = attrs.map(a => a.name -> a).toMap
+schema.map(f => attrMap.getOrElse(f.name,
+  AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)()))
+  case _ =>
+schema.toAttributes
+}
+  }
+
+  private lazy val v2Options: DataSourceOptions = {
+// ensure path and table options are set correctly
+val updatedOptions = new mutable.HashMap[String, String]
+updatedOptions ++= options
+
+new DataSourceOptions(options.asJava)
+  }
+
+  private val sourceName: String = {
+source match {
+  case registered: DataSourceRegister =>
+registered.shortName()
+  case _ =>
+source.getClass.getSimpleName
+}
+  }
+
+  lazy val (
+  reader: DataSourceReader,
+  unsupportedFilters: Seq[Expression],
+  pushedFilters: Seq[Expression]) = {
+val newReader = userSchema match {
+  case Some(s) =>
+asReadSupportWithSchema.createReader(s, v2Options)
+  case _ =>
+asReadSupport.createReader(v2Options)
+}
+
+projection.foreach { attrs =>
+  DataSourceV2Relation.pushRequiredColumns(newReader, 
attrs.toStructType)
+}
+
+val (remainingFilters, pushedFilters) = filters match {
+  case Some(filterSeq) =>
+DataSourceV2Relation.pushFilters(newReader, filterSeq)
+  case _ =>
+(Nil, Nil)
+}
+
+(newReader, remainingFilters, pushedFilters)
+  }
+
+  private lazy val asReadSupport: ReadSupport = {
--- End diff --

I think the implementation is pretty clear:

```java
if (ds.isInstanceOf[ReadSupport] || ds.isInstanceOf[ReadSupportWithSchema]) 
{
  ...
} else {
  loadV1Source(paths: _*)
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20490
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87269/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20490
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

2018-02-09 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20490
  
**[Test build #87269 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87269/testReport)**
 for PR 20490 at commit 
[`ec96856`](https://github.com/apache/spark/commit/ec968563605f961d3d874913de51265683a8c132).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20351: [SPARK-23014][SS] Fully remove V1 memory sink.

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20351
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87271/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20351: [SPARK-23014][SS] Fully remove V1 memory sink.

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20351
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20351: [SPARK-23014][SS] Fully remove V1 memory sink.

2018-02-09 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20351
  
**[Test build #87271 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87271/testReport)**
 for PR 20351 at commit 
[`efcf03d`](https://github.com/apache/spark/commit/efcf03d47bda6c9e0c797d7415d20ae2534db393).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class MemoryStreamDataReaderFactory(records: Array[UnsafeRow])`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20519: [Spark-23240][python] Don't let python site customizatio...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20519
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20519: [Spark-23240][python] Don't let python site customizatio...

2018-02-09 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20519
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87267/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

2018-02-09 Thread stoader
Github user stoader commented on a diff in the pull request:

https://github.com/apache/spark/pull/19775#discussion_r167359005
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/sink/PrometheusSink.scala ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import java.net.URI
+import java.util
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import com.codahale.metrics._
+import io.prometheus.client.CollectorRegistry
+import io.prometheus.client.dropwizard.DropwizardExports
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkContext, 
SparkEnv}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.METRICS_NAMESPACE
+import org.apache.spark.metrics.MetricsSystem
+import 
org.apache.spark.metrics.prometheus.client.exporter.PushGatewayWithTimestamp
+
+
+private[spark] class PrometheusSink(
+ val property: Properties,
+ val registry: MetricRegistry,
+ securityMgr: SecurityManager)
+  extends Sink with Logging {
+
+  protected class Reporter(registry: MetricRegistry)
+extends ScheduledReporter(
+  registry,
+  "prometheus-reporter",
+  MetricFilter.ALL,
+  TimeUnit.SECONDS,
+  TimeUnit.MILLISECONDS) {
+
+val defaultSparkConf: SparkConf = new SparkConf(true)
+
+override def report(
+ gauges: util.SortedMap[String, Gauge[_]],
+ counters: util.SortedMap[String, Counter],
+ histograms: util.SortedMap[String, Histogram],
+ meters: util.SortedMap[String, Meter],
+ timers: util.SortedMap[String, Timer]): Unit = {
+
+  // SparkEnv may become available only after metrics sink creation 
thus retrieving
+  // SparkConf from spark env here and not during the 
creation/initialisation of PrometheusSink.
+  val sparkConf: SparkConf = 
Option(SparkEnv.get).map(_.conf).getOrElse(defaultSparkConf)
+
+  val metricsNamespace: Option[String] = 
sparkConf.get(METRICS_NAMESPACE)
+  val sparkAppId: Option[String] = sparkConf.getOption("spark.app.id")
+  val executorId: Option[String] = 
sparkConf.getOption("spark.executor.id")
+
+  logInfo(s"metricsNamespace=$metricsNamespace, 
sparkAppId=$sparkAppId, " +
+s"executorId=$executorId")
+
+  val role: String = (sparkAppId, executorId) match {
+case (Some(_), Some(SparkContext.DRIVER_IDENTIFIER)) => "driver"
+case (Some(_), Some(_)) => "executor"
+case _ => "shuffle"
+  }
+
+  val job: String = role match {
+case "driver" => metricsNamespace.getOrElse(sparkAppId.get)
+case "executor" => metricsNamespace.getOrElse(sparkAppId.get)
+case _ => metricsNamespace.getOrElse("shuffle")
+  }
+  logInfo(s"role=$role, job=$job")
+
+  val groupingKey: Map[String, String] = (role, executorId) match {
+case ("driver", _) => Map("role" -> role)
+case ("executor", Some(id)) => Map ("role" -> role, "number" -> id)
+case _ => Map("role" -> role)
+  }
+
+
+  pushGateway.pushAdd(pushRegistry, job, groupingKey.asJava,
+s"${System.currentTimeMillis}")
+
+}
+
+  }
+
+  val DEFAULT_PUSH_PERIOD: Int = 10
+  val DEFAULT_PUSH_PERIOD_UNIT: TimeUnit = TimeUnit.SECONDS
+  val DEFAULT_PUSHGATEWAY_ADDRESS: String = "127.0.0.1:9091"
+  val DEFAULT_PUSHGATEWAY_ADDRESS_PROTOCOL: String = "http"
+
+  val KEY_PUSH_PERIOD = "period"
+  val KEY_PUSH_PERIOD_UNIT = "unit"
+  val KEY_PUSHGATEWAY_ADDRESS = "pushgateway-address"
+  val 

[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

2018-02-09 Thread stoader
Github user stoader commented on a diff in the pull request:

https://github.com/apache/spark/pull/19775#discussion_r167359036
  
--- Diff: 
core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics.prometheus.client.exporter;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.Enumeration;
+
+import io.prometheus.client.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TextFormatWithTimestamp {
+private static final Logger logger = 
LoggerFactory.getLogger(TextFormatWithTimestamp.class);
+
+/**
+ * Content-type for text version 0.0.4.
+ */
+public static final String CONTENT_TYPE_004 = "text/plain; 
version=0.0.4; charset=utf-8";
+
+private static StringBuilder jsonMessageLogBuilder = new 
StringBuilder();
+
+public static void write004(Writer writer,
+Enumeration 
mfs)throws IOException {
+write004(writer, mfs, null);
+}
+
+/**
+ * Write out the text version 0.0.4 of the given MetricFamilySamples.
+ */
+public static void write004(Writer 
writer,Enumeration mfs,
+String timestamp) throws IOException {
+/* See http://prometheus.io/docs/instrumenting/exposition_formats/
+ * for the output format specification. */
+while(mfs.hasMoreElements()) {
+Collector.MetricFamilySamples metricFamilySamples = 
mfs.nextElement();
--- End diff --

This class has been refactored.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

2018-02-09 Thread stoader
Github user stoader commented on a diff in the pull request:

https://github.com/apache/spark/pull/19775#discussion_r167359067
  
--- Diff: 
core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/PushGatewayWithTimestamp.java
 ---
@@ -0,0 +1,320 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics.prometheus.client.exporter;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+/**
+ * Export metrics via the Prometheus Pushgateway.
+ * 
+ * The Prometheus Pushgateway exists to allow ephemeral and
+ * batch jobs to expose their metrics to Prometheus.
+ * Since these kinds of jobs may not exist long enough to be scraped,
+ * they can instead push their metrics to a Pushgateway.
+ * This class allows pushing the contents of a {@link CollectorRegistry} to
+ * a Pushgateway.
+ * 
+ * Example usage:
+ * 
+ * {@code
+ *   void executeBatchJob() throws Exception {
+ * CollectorRegistry registry = new CollectorRegistry();
+ * Gauge duration = Gauge.build()
+ * .name("my_batch_job_duration_seconds")
+ * .help("Duration of my batch job in seconds.")
+ * .register(registry);
+ * Gauge.Timer durationTimer = duration.startTimer();
+ * try {
+ *   // Your code here.
+ *
+ *   // This is only added to the registry after success,
+ *   // so that a previous success in the Pushgateway isn't 
overwritten on failure.
+ *   Gauge lastSuccess = Gauge.build()
+ *   .name("my_batch_job_last_success")
+ *   .help("Last time my batch job succeeded, in unixtime.")
+ *   .register(registry);
+ *   lastSuccess.setToCurrentTime();
+ * } finally {
+ *   durationTimer.setDuration();
+ *   PushGatewayWithTimestamp pg = new 
PushGatewayWithTimestamp("127.0.0.1:9091");
+ *   pg.pushAdd(registry, "my_batch_job");
+ * }
+ *   }
+ * }
+ * 
+ * 
+ * See https://github.com/prometheus/pushgateway;>
+ * https://github.com/prometheus/pushgateway
+ */
+public class PushGatewayWithTimestamp {
+
+private static final Logger logger = 
LoggerFactory.getLogger(PushGatewayWithTimestamp.class);
+private final String address;
+private static final int SECONDS_PER_MILLISECOND = 1000;
+/**
+ * Construct a Pushgateway, with the given address.
+ * 
+ * @param address  host:port or ip:port of the Pushgateway.
+ */
+public PushGatewayWithTimestamp(String address) {
+this.address = address;
+}
+
+/**
+ * Pushes all metrics in a registry,
+ * replacing all those with the same job and no grouping key.
+ * 
+ * This uses the PUT HTTP method.
+ */
+public void push(CollectorRegistry registry, String job) throws 
IOException {
+doRequest(registry, job, null, "PUT", null);
+}
+
+/**
+ * Pushes all metrics in a Collector,
+ * replacing all those with the same job and no grouping key.
+ * 
+ * This is useful for pushing a single Gauge.
+ * 
+ * This uses the PUT HTTP method.
+ */
+public void push(Collector collector, String job) throws IOException {
+CollectorRegistry registry = new CollectorRegistry();
+collector.register(registry);
+push(registry, job);
+}
+
+/**
+ * Pushes all metrics in a registry,
+ * replacing all those with the same job and grouping key.
+ * 
+ * This uses the PUT HTTP method.
+ */
+public void push(CollectorRegistry registry,
+ String job, Map 

[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

2018-02-09 Thread stoader
Github user stoader commented on a diff in the pull request:

https://github.com/apache/spark/pull/19775#discussion_r167359075
  
--- Diff: 
core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/PushGatewayWithTimestamp.java
 ---
@@ -0,0 +1,320 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics.prometheus.client.exporter;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+/**
+ * Export metrics via the Prometheus Pushgateway.
+ * 
+ * The Prometheus Pushgateway exists to allow ephemeral and
+ * batch jobs to expose their metrics to Prometheus.
+ * Since these kinds of jobs may not exist long enough to be scraped,
+ * they can instead push their metrics to a Pushgateway.
+ * This class allows pushing the contents of a {@link CollectorRegistry} to
+ * a Pushgateway.
+ * 
+ * Example usage:
+ * 
+ * {@code
+ *   void executeBatchJob() throws Exception {
+ * CollectorRegistry registry = new CollectorRegistry();
+ * Gauge duration = Gauge.build()
+ * .name("my_batch_job_duration_seconds")
+ * .help("Duration of my batch job in seconds.")
+ * .register(registry);
+ * Gauge.Timer durationTimer = duration.startTimer();
+ * try {
+ *   // Your code here.
+ *
+ *   // This is only added to the registry after success,
+ *   // so that a previous success in the Pushgateway isn't 
overwritten on failure.
+ *   Gauge lastSuccess = Gauge.build()
+ *   .name("my_batch_job_last_success")
+ *   .help("Last time my batch job succeeded, in unixtime.")
+ *   .register(registry);
+ *   lastSuccess.setToCurrentTime();
+ * } finally {
+ *   durationTimer.setDuration();
+ *   PushGatewayWithTimestamp pg = new 
PushGatewayWithTimestamp("127.0.0.1:9091");
+ *   pg.pushAdd(registry, "my_batch_job");
+ * }
+ *   }
+ * }
+ * 
+ * 
+ * See https://github.com/prometheus/pushgateway;>
+ * https://github.com/prometheus/pushgateway
+ */
+public class PushGatewayWithTimestamp {
+
+private static final Logger logger = 
LoggerFactory.getLogger(PushGatewayWithTimestamp.class);
+private final String address;
+private static final int SECONDS_PER_MILLISECOND = 1000;
+/**
+ * Construct a Pushgateway, with the given address.
+ * 
+ * @param address  host:port or ip:port of the Pushgateway.
+ */
+public PushGatewayWithTimestamp(String address) {
+this.address = address;
+}
+
+/**
+ * Pushes all metrics in a registry,
+ * replacing all those with the same job and no grouping key.
+ * 
+ * This uses the PUT HTTP method.
+ */
+public void push(CollectorRegistry registry, String job) throws 
IOException {
+doRequest(registry, job, null, "PUT", null);
+}
+
+/**
+ * Pushes all metrics in a Collector,
+ * replacing all those with the same job and no grouping key.
+ * 
+ * This is useful for pushing a single Gauge.
+ * 
+ * This uses the PUT HTTP method.
+ */
+public void push(Collector collector, String job) throws IOException {
+CollectorRegistry registry = new CollectorRegistry();
+collector.register(registry);
+push(registry, job);
+}
+
+/**
+ * Pushes all metrics in a registry,
+ * replacing all those with the same job and grouping key.
+ * 
+ * This uses the PUT HTTP method.
+ */
+public void push(CollectorRegistry registry,
+ String job, Map 

[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

2018-02-09 Thread stoader
Github user stoader commented on a diff in the pull request:

https://github.com/apache/spark/pull/19775#discussion_r167359048
  
--- Diff: 
core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/TextFormatWithTimestamp.java
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics.prometheus.client.exporter;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.Enumeration;
+
+import io.prometheus.client.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TextFormatWithTimestamp {
+private static final Logger logger = 
LoggerFactory.getLogger(TextFormatWithTimestamp.class);
+
+/**
+ * Content-type for text version 0.0.4.
+ */
+public static final String CONTENT_TYPE_004 = "text/plain; 
version=0.0.4; charset=utf-8";
+
+private static StringBuilder jsonMessageLogBuilder = new 
StringBuilder();
--- End diff --

This class has been refactored.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

2018-02-09 Thread stoader
Github user stoader commented on a diff in the pull request:

https://github.com/apache/spark/pull/19775#discussion_r167359010
  
--- Diff: 
core/src/main/scala/org/apache/spark/metrics/sink/PrometheusSink.scala ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import java.net.URI
+import java.util
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import com.codahale.metrics._
+import io.prometheus.client.CollectorRegistry
+import io.prometheus.client.dropwizard.DropwizardExports
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkContext, 
SparkEnv}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.METRICS_NAMESPACE
+import org.apache.spark.metrics.MetricsSystem
+import 
org.apache.spark.metrics.prometheus.client.exporter.PushGatewayWithTimestamp
+
+
+private[spark] class PrometheusSink(
+ val property: Properties,
+ val registry: MetricRegistry,
+ securityMgr: SecurityManager)
+  extends Sink with Logging {
+
+  protected class Reporter(registry: MetricRegistry)
+extends ScheduledReporter(
+  registry,
+  "prometheus-reporter",
+  MetricFilter.ALL,
+  TimeUnit.SECONDS,
+  TimeUnit.MILLISECONDS) {
+
+val defaultSparkConf: SparkConf = new SparkConf(true)
+
+override def report(
+ gauges: util.SortedMap[String, Gauge[_]],
+ counters: util.SortedMap[String, Counter],
+ histograms: util.SortedMap[String, Histogram],
+ meters: util.SortedMap[String, Meter],
+ timers: util.SortedMap[String, Timer]): Unit = {
+
+  // SparkEnv may become available only after metrics sink creation 
thus retrieving
+  // SparkConf from spark env here and not during the 
creation/initialisation of PrometheusSink.
+  val sparkConf: SparkConf = 
Option(SparkEnv.get).map(_.conf).getOrElse(defaultSparkConf)
+
+  val metricsNamespace: Option[String] = 
sparkConf.get(METRICS_NAMESPACE)
+  val sparkAppId: Option[String] = sparkConf.getOption("spark.app.id")
+  val executorId: Option[String] = 
sparkConf.getOption("spark.executor.id")
+
+  logInfo(s"metricsNamespace=$metricsNamespace, 
sparkAppId=$sparkAppId, " +
+s"executorId=$executorId")
+
+  val role: String = (sparkAppId, executorId) match {
+case (Some(_), Some(SparkContext.DRIVER_IDENTIFIER)) => "driver"
+case (Some(_), Some(_)) => "executor"
+case _ => "shuffle"
+  }
+
+  val job: String = role match {
+case "driver" => metricsNamespace.getOrElse(sparkAppId.get)
+case "executor" => metricsNamespace.getOrElse(sparkAppId.get)
+case _ => metricsNamespace.getOrElse("shuffle")
+  }
+  logInfo(s"role=$role, job=$job")
+
+  val groupingKey: Map[String, String] = (role, executorId) match {
+case ("driver", _) => Map("role" -> role)
+case ("executor", Some(id)) => Map ("role" -> role, "number" -> id)
+case _ => Map("role" -> role)
+  }
+
+
--- End diff --

Empty line removed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19775: [SPARK-22343][core] Add support for publishing Sp...

2018-02-09 Thread stoader
Github user stoader commented on a diff in the pull request:

https://github.com/apache/spark/pull/19775#discussion_r167359085
  
--- Diff: 
core/src/main/java/org/apache/spark/metrics/prometheus/client/exporter/PushGatewayWithTimestamp.java
 ---
@@ -0,0 +1,320 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics.prometheus.client.exporter;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+/**
+ * Export metrics via the Prometheus Pushgateway.
+ * 
+ * The Prometheus Pushgateway exists to allow ephemeral and
+ * batch jobs to expose their metrics to Prometheus.
+ * Since these kinds of jobs may not exist long enough to be scraped,
+ * they can instead push their metrics to a Pushgateway.
+ * This class allows pushing the contents of a {@link CollectorRegistry} to
+ * a Pushgateway.
+ * 
+ * Example usage:
+ * 
+ * {@code
+ *   void executeBatchJob() throws Exception {
+ * CollectorRegistry registry = new CollectorRegistry();
+ * Gauge duration = Gauge.build()
+ * .name("my_batch_job_duration_seconds")
+ * .help("Duration of my batch job in seconds.")
+ * .register(registry);
+ * Gauge.Timer durationTimer = duration.startTimer();
+ * try {
+ *   // Your code here.
+ *
+ *   // This is only added to the registry after success,
+ *   // so that a previous success in the Pushgateway isn't 
overwritten on failure.
+ *   Gauge lastSuccess = Gauge.build()
+ *   .name("my_batch_job_last_success")
+ *   .help("Last time my batch job succeeded, in unixtime.")
+ *   .register(registry);
+ *   lastSuccess.setToCurrentTime();
+ * } finally {
+ *   durationTimer.setDuration();
+ *   PushGatewayWithTimestamp pg = new 
PushGatewayWithTimestamp("127.0.0.1:9091");
+ *   pg.pushAdd(registry, "my_batch_job");
+ * }
+ *   }
+ * }
+ * 
+ * 
+ * See https://github.com/prometheus/pushgateway;>
+ * https://github.com/prometheus/pushgateway
+ */
+public class PushGatewayWithTimestamp {
+
+private static final Logger logger = 
LoggerFactory.getLogger(PushGatewayWithTimestamp.class);
+private final String address;
+private static final int SECONDS_PER_MILLISECOND = 1000;
+/**
+ * Construct a Pushgateway, with the given address.
+ * 
+ * @param address  host:port or ip:port of the Pushgateway.
+ */
+public PushGatewayWithTimestamp(String address) {
+this.address = address;
+}
+
+/**
+ * Pushes all metrics in a registry,
+ * replacing all those with the same job and no grouping key.
+ * 
+ * This uses the PUT HTTP method.
+ */
+public void push(CollectorRegistry registry, String job) throws 
IOException {
+doRequest(registry, job, null, "PUT", null);
+}
+
+/**
+ * Pushes all metrics in a Collector,
+ * replacing all those with the same job and no grouping key.
+ * 
+ * This is useful for pushing a single Gauge.
+ * 
+ * This uses the PUT HTTP method.
+ */
+public void push(Collector collector, String job) throws IOException {
+CollectorRegistry registry = new CollectorRegistry();
+collector.register(registry);
+push(registry, job);
+}
+
+/**
+ * Pushes all metrics in a registry,
+ * replacing all those with the same job and grouping key.
+ * 
+ * This uses the PUT HTTP method.
+ */
+public void push(CollectorRegistry registry,
+ String job, Map 

  1   2   3   4   >