[GitHub] spark pull request #22282: [SPARK-23539][SS] Add support for Kafka headers i...

2018-08-30 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22282#discussion_r214073971
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
 ---
@@ -131,9 +158,25 @@ private[kafka010] abstract class KafkaRowWriter(
 throw new 
IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " +
   s"attribute unsupported type ${t.catalogString}")
 }
+val headersExpression = inputSchema
+  .find(_.name == KafkaWriter.HEADERS_ATTRIBUTE_NAME).getOrElse(
+  Literal(CatalystTypeConverters.convertToCatalyst(null), 
MapType(StringType, BinaryType))
+)
+headersExpression.dataType match {
+  case MapType(StringType, BinaryType, true) => // good
+  case t =>
+throw new 
IllegalStateException(s"${KafkaWriter.HEADERS_ATTRIBUTE_NAME} " +
--- End diff --

This exception is different from the AnalysisException thrown in the next 
class.
What's the reason ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client version...

2018-08-01 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21955
  
Thanks for the follow-up.

lgtm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21488: [SPARK-18057][SS] Update Kafka client version from 0.10....

2018-08-01 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21488
  
I used the following command and the test passed:

mvn test -Phadoop-2.6 -Pyarn -Phive -Dtest=KafkaMicroBatchSourceSuite -rf 
external/kafka-0-10-sql

Please take a look at the 'Disk error' message and see if it was related to 
test failure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21488: [SPARK-18057][SS] Update Kafka client version from 0.10....

2018-07-30 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21488
  
@zsxwing 
Is there anything I should do for this PR ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21488: [SPARK-18057][SS] Update Kafka client version from 0.10....

2018-07-29 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21488
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21488: [SPARK-18057][SS] Update Kafka client version from 0.10....

2018-07-29 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21488
  
```
22:36:05.028 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 
in stage 16314.0 (TID 39181, localhost, executor driver): 
java.io.FileNotFoundException: File 
file:/home/jenkins/workspace/SparkPullRequestBuilder/target/tmp/spark-0bbc239c-37c5-4df2-b86d-e9c7628ceb28/f1=1/f2=1/part-0-390ac6da-50dc-4d32-ba08-462da1e8a0c2.c000.snappy.parquet
 does not exist
It is possible the underlying files have been updated. You can explicitly 
invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in 
SQL or by recreating the Dataset/DataFrame involved.
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:131)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:182)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
```
Doesn't seem to be related to PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21488: [SPARK-18057][SS] Update Kafka client version from 0.10....

2018-07-23 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21488
  
Ryan:
Thanks for the close follow-up.

Once Kafka 2.0.0 is released, I will incorporate the above.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21488: [SPARK-18057][SS] Update Kafka client version from 0.10....

2018-07-23 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21488
  
@zsxwing 
Is there anything that needs to be done from my side ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21488: [SPARK-18057][SS] Update Kafka client version from 0.10....

2018-07-20 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21488
  
Test failure was in Hive test, not related to this PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21488: [SPARK-18057][SS] Update Kafka client version from 0.10....

2018-07-20 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21488
  
Thanks for the reminder, @ijuma 

Updated pom.xml and title accordingly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21488: [SPARK-18057][SS] Update Kafka client version from 0.10....

2018-07-20 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21488
  
Ryan:
Thanks for the reminder.

I have disabled that test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21488: SPARK-18057 Update structured streaming kafka from 0.10....

2018-07-17 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21488
  
Pulled in your commits.

Will look at test failures.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21488: SPARK-18057 Update structured streaming kafka from 0.10....

2018-07-17 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21488
  
Not sure what to do with the following build error which is not caused by 
the PR:
```
[ERROR] 
/spark/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala:71:
 Symbol 'term org.eclipse' is missing from the classpath.
This symbol is required by 'method 
org.apache.spark.metrics.MetricsSystem.getServletHandlers'.
Make sure that term eclipse is in your classpath and check for conflicting 
dependencies with `-Ylog-classpath`.
A full rebuild may help if 'MetricsSystem.class' was compiled against an 
incompatible version of org.
[ERROR] val kafkaParams = Map[String, Object](
[ERROR]  ^
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...

2018-07-17 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21488#discussion_r203106522
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -115,7 +116,7 @@ private[kafka010] class KafkaOffsetReader(
   def fetchTopicPartitions(): Set[TopicPartition] = runUninterruptibly {
 assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
 // Poll to get the latest assigned partitions
-consumer.poll(0)
+consumer.poll(JDuration.ofMillis(0))
--- End diff --

Depending on the Kafka release we agree upon, I can revert.
Duration is recommended API for 2.0.0 release


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21488: SPARK-18057 Update structured streaming kafka from 0.10....

2018-07-17 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21488
  
@ijuma 
Sorry for the late response. 9 days ago I was in China where access to 
gmail is intermittent.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21488: SPARK-18057 Update structured streaming kafka from 0.10....

2018-07-17 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21488
  
w.r.t. stable Kafka release, it seems 2.0.0 RC2 would pass:

http://search-hadoop.com/m/Kafka/uyzND1ClBEezundG1?subj=Re+VOTE+2+0+0+RC2


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21700: [SPARK-24717][SS] Split out max retain version of state ...

2018-07-05 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21700
  
Please publish the above results to the thread where you requested review 
from committers.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...

2018-07-05 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21700#discussion_r200248889
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -240,7 +244,11 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
   @volatile private var storeConf: StateStoreConf = _
   @volatile private var hadoopConf: Configuration = _
 
-  private lazy val loadedMaps = new mutable.HashMap[Long, MapType]
+  // taking default value first: this will be updated by init method with 
configuration
+  @volatile private var numberOfVersionsRetainInMemory: Int = 2
--- End diff --

numberOfVersionsRetainInMemory -> numberOfVersionsToRetainInMemory


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...

2018-07-05 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21700#discussion_r200242174
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.streaming.state;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * This class implements bounded {@link java.util.SortedMap} based on 
{@link java.util.TreeMap}.
+ *
+ * As TreeMap does, this implementation sorts elements in natural order, 
and cuts off
+ * smaller elements to retain at most bigger N elements.
+ *
+ * You can provide reversed order of comparator to retain smaller elements 
instead.
+ *
+ * This class is not thread-safe, so synchronization would be needed to 
use this concurrently.
+ *
+ * @param  key type
+ * @param  value type
+ */
+public class BoundedSortedMap extends TreeMap {
+
+  private final int limit;
+
+  /**
+   * Constructor
+   *
+   * @param comparator comparator instance to compare between keys
+   * @param limit  bounded size
+   */
+  public BoundedSortedMap(Comparator comparator, int limit) {
+super(comparator);
+this.limit = limit;
+  }
+
+  @Override
+  public void putAll(Map map) {
--- End diff --

Should the map parameter be of type SortedMap ?
With ordinary Map, the traversal order is not fixed. It may produce 
non-deterministic result if the map's size is bigger than this 
BoundedSortedMap's size


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21700: [SPARK-24717][SS] Split out max retain version of...

2018-07-05 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21700#discussion_r200242876
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/streaming/state/BoundedSortedMap.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.streaming.state;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * This class implements bounded {@link java.util.SortedMap} based on 
{@link java.util.TreeMap}.
+ *
+ * As TreeMap does, this implementation sorts elements in natural order, 
and cuts off
+ * smaller elements to retain at most bigger N elements.
+ *
+ * You can provide reversed order of comparator to retain smaller elements 
instead.
+ *
+ * This class is not thread-safe, so synchronization would be needed to 
use this concurrently.
+ *
+ * @param  key type
+ * @param  value type
+ */
+public class BoundedSortedMap extends TreeMap {
+
+  private final int limit;
+
+  /**
+   * Constructor
+   *
+   * @param comparator comparator instance to compare between keys
+   * @param limit  bounded size
+   */
+  public BoundedSortedMap(Comparator comparator, int limit) {
+super(comparator);
+this.limit = limit;
+  }
+
+  @Override
+  public void putAll(Map map) {
+for (Map.Entry entry : map.entrySet()) {
--- End diff --

I can think of some optimization here:
If the map's size is bigger than or equal to this BoundedSortedMap's size, 
you can call clear on this sortedMap first if map.lastKey() is lower than 
this.firstKey - since all of this sortedMap's elements would be evicted.
On the other hand, if map.firstKey() is higher than this.lastKey and this 
sortedMap is at full capacity, there is no need to enter the loop - no element 
from map would be taken anyway.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21651: [SPARK-18258] Sink need access to offset representation

2018-06-28 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21651
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21651: [SPARK-18258] Sink need access to offset representation

2018-06-27 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21651
  
cc @tdas 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21488: SPARK-18057 Update structured streaming kafka from 0.10....

2018-06-13 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21488
  
Located the test output:
```
-rw-r--r-- 1 hbase hadoop 35335485506 Jun 13 20:36 target/unit-tests.log
```
Still need to find out cause for assertion failure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21488: SPARK-18057 Update structured streaming kafka from 0.10....

2018-06-08 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21488
  
Made some progress in testing.
Now facing:
```
- assign from latest offsets (failOnDataLoss: true) *** FAILED ***
  java.lang.IllegalArgumentException: requirement failed
  at scala.Predef$.require(Predef.scala:212)
  at 
org.apache.spark.sql.kafka010.KafkaSourceSuiteBase.org$apache$spark$sql$kafka010$KafkaSourceSuiteBase$$testFromLatestOffsets(KafkaMicroBatchSourceSuite.scala:993)
  at 
org.apache.spark.sql.kafka010.KafkaSourceSuiteBase$$anonfun$37$$anonfun$apply$2.apply$mcV$sp(KafkaMicroBatchSourceSuite.scala:734)
  at 
org.apache.spark.sql.kafka010.KafkaSourceSuiteBase$$anonfun$37$$anonfun$apply$2.apply(KafkaMicroBatchSourceSuite.scala:732)
  at 
org.apache.spark.sql.kafka010.KafkaSourceSuiteBase$$anonfun$37$$anonfun$apply$2.apply(KafkaMicroBatchSourceSuite.scala:732)
  at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21488: SPARK-18057 Update structured streaming kafka from 0.10....

2018-06-06 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21488
  
There is only 
target/surefire-reports/TEST-org.apache.spark.sql.kafka010.KafkaMicroBatchV2SourceSuite.xml
 under target/surefire-reports

That file doesn't contain test output.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21488: SPARK-18057 Update structured streaming kafka from 0.10....

2018-06-05 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21488
  
I tried the following change but didn't seem to get more output from Kafka:
```
diff --git a/external/kafka-0-10-sql/src/test/resources/log4j.properties 
b/external/kafka-0-10-sql/src/test/resources/log4j.properties
index 75e3b53..0d65339 100644
--- a/external/kafka-0-10-sql/src/test/resources/log4j.properties
+++ b/external/kafka-0-10-sql/src/test/resources/log4j.properties
@@ -25,4 +25,4 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd 
HH:mm:ss.SSS} %t %p %c{

 # Ignore messages below warning level from Jetty, because it's a bit 
verbose
 log4j.logger.org.spark-project.jetty=WARN
-
+log4j.logger.org.apache.kafka=DEBUG
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21488: SPARK-18057 Update structured streaming kafka from 0.10....

2018-06-05 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21488
  
Currently I am trying to get test suite pass first.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...

2018-06-03 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21488#discussion_r192601997
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 ---
@@ -96,10 +101,13 @@ class KafkaTestUtils(withBrokerProps: Map[String, 
Object] = Map.empty) extends L
   // Set up the Embedded Zookeeper server and get the proper Zookeeper port
   private def setupEmbeddedZookeeper(): Unit = {
 // Zookeeper server startup
-zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
+val zkSvr = s"$zkHost:$zkPort";
+zookeeper = new EmbeddedZookeeper(zkSvr)
 // Get the actual zookeeper binding port
 zkPort = zookeeper.actualPort
-zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, 
zkConnectionTimeout, false)
+zkUtils = ZkUtils(zkSvr, zkSessionTimeout, zkConnectionTimeout, false)
+zkClient = KafkaZkClient(zkSvr, false, 6000, 1, Int.MaxValue, 
Time.SYSTEM)
+adminZkClient = new AdminZkClient(zkClient)
--- End diff --

AdminClient is abstract.
KafkaAdminClient doesn't provide addPartitions.

Mind giving some pointer ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...

2018-06-03 Thread tedyu
GitHub user tedyu opened a pull request:

https://github.com/apache/spark/pull/21488

SPARK-18057 Update structured streaming kafka from 0.10.0.1 to 2.0.0

## What changes were proposed in this pull request?

This PR upgrades to the Kafka 2.0.0 release where KIP-266 is integrated.

## How was this patch tested?

This PR uses existing Kafka related unit tests

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tedyu/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21488.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21488


commit 0a22686d9a388a21d5dd38513854341d3f37f738
Author: tedyu 
Date:   2018-06-03T19:54:22Z

SPARK-18057 Update structured streaming kafka from 0.10.0.1 to 2.0.0




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-04-27 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r184736836
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -116,21 +118,44 @@ object DataWritingSparkTask extends Logging {
   def run(
   writeTask: DataWriterFactory[InternalRow],
   context: TaskContext,
-  iter: Iterator[InternalRow]): WriterCommitMessage = {
-val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
+  iter: Iterator[InternalRow],
+  useCommitCoordinator: Boolean): WriterCommitMessage = {
+val stageId = context.stageId()
+val partId = context.partitionId()
+val attemptId = context.attemptNumber()
+val dataWriter = writeTask.createDataWriter(partId, attemptId)
 
 // write the data and commit this writer.
 Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
   iter.foreach(dataWriter.write)
-  logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
-  val msg = dataWriter.commit()
-  logInfo(s"Writer for partition ${context.partitionId()} committed.")
+
+  val msg = if (useCommitCoordinator) {
+val coordinator = SparkEnv.get.outputCommitCoordinator
+val commitAuthorized = coordinator.canCommit(context.stageId(), 
partId, attemptId)
+if (commitAuthorized) {
+  logInfo(s"Writer for stage $stageId, task $partId.$attemptId is 
authorized to commit.")
+  dataWriter.commit()
+} else {
+  val message = s"Stage $stageId, task $partId.$attemptId: driver 
did not authorize commit"
+  logInfo(message)
--- End diff --

This should be WARN or ERROR since exception is thrown below.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21124: [SPARK-23004][SS] Ensure StateStore.commit is called onl...

2018-04-22 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21124
  
+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...

2018-04-20 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/21109
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...

2018-03-16 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/20767
  
Interesting.


https://commons.apache.org/proper/commons-pool/apidocs/org/apache/commons/pool2/impl/BaseGenericObjectPool.html#getBorrowedCount()


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...

2018-03-16 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/20767
  
I did a quick search for 'apache commons pool metrics' which didn't show up 
directly related links.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...

2018-03-16 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/20767
  
@tdas 
Do you think a follow on JIRA can be logged for adding metrics for the 
cache operations ?

Thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...

2018-03-15 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20767#discussion_r174984237
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -467,44 +435,58 @@ private[kafka010] object KafkaDataConsumer extends 
Logging {
   // If this is reattempt at running the task, then invalidate cached 
consumer if any and
   // start with a new one.
   if (existingInternalConsumer != null) {
-if (existingInternalConsumer.inuse) {
-  // Consumer exists in cache and is somehow in use. Don't close 
it immediately, but
-  // mark it for being closed when it is released.
+// Consumer exists in cache. If its in use, mark it for closing 
later, or close it now.
+if (existingInternalConsumer.inUse) {
   existingInternalConsumer.markedForClose = true
-  NonCachedKafkaDataConsumer(newInternalConsumer)
-
 } else {
-  // Consumer exists in cache and is not in use, so close it 
immediately and replace
-  // it with a new one.
   existingInternalConsumer.close()
-  cache.put(key, newInternalConsumer)
-  CachedKafkaDataConsumer(newInternalConsumer)
-
 }
-  } else {
-// Consumer is not cached, put the new one in the cache
-cache.put(key, newInternalConsumer)
-CachedKafkaDataConsumer(newInternalConsumer)
-
   }
+  cache.remove(key)  // Invalidate the cache in any case
+  NonCachedKafkaDataConsumer(newInternalConsumer)
+
 } else if (!useCache) {
   // If planner asks to not reuse consumers, then do not use it, 
return a new consumer
   NonCachedKafkaDataConsumer(newInternalConsumer)
 
 } else if (existingInternalConsumer == null) {
   // If consumer is not already cached, then put a new in the cache 
and return it
-  newInternalConsumer.inuse = true
   cache.put(key, newInternalConsumer)
+  newInternalConsumer.inUse = true
   CachedKafkaDataConsumer(newInternalConsumer)
 
-} else if (existingInternalConsumer.inuse) {
+} else if (existingInternalConsumer.inUse) {
   // If consumer is already cached but is currently in use, then 
return a new consumer
   NonCachedKafkaDataConsumer(newInternalConsumer)
--- End diff --

Maybe keep an internal counter for how many times the non cached consumer 
is created.
This would give us information on how effective the cache is


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...

2018-03-10 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20767#discussion_r173636109
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -342,80 +415,103 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 }
   }
 
-  def releaseKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
+  private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = {
 synchronized {
-  val consumer = cache.get(key)
-  if (consumer != null) {
-consumer.inuse = false
-  } else {
-logWarning(s"Attempting to release consumer that does not exist")
-  }
-}
-  }
 
-  /**
-   * Removes (and closes) the Kafka Consumer for the given topic, 
partition and group id.
-   */
-  def removeKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
-synchronized {
-  val removedConsumer = cache.remove(key)
-  if (removedConsumer != null) {
-removedConsumer.close()
+  // If it has been marked for close, then do it any way
+  if (intConsumer.inuse && intConsumer.markedForClose) 
intConsumer.close()
+  intConsumer.inuse = false
+
+  // Clear the consumer from the cache if this is indeed the consumer 
present in the cache
+  val key = new CacheKey(intConsumer.topicPartition, 
intConsumer.kafkaParams)
+  val cachedIntConsumer = cache.get(key)
+  if (cachedIntConsumer != null) {
+if (cachedIntConsumer.eq(intConsumer)) {
+  // The released consumer is indeed the cached one.
+  cache.remove(key)
+} else {
+  // The released consumer is not the cached one. Don't do 
anything.
+  // This should not happen as long as we maintain the invariant 
mentioned above.
+  logWarning(
+s"Cached consumer not the same one as the one being release" +
+  s"\ncached = $cachedIntConsumer 
[${System.identityHashCode(cachedIntConsumer)}]" +
+  s"\nreleased = $intConsumer 
[${System.identityHashCode(intConsumer)}]")
+}
+  } else {
+// The released consumer is not in the cache. Don't do anything.
+// This should not happen as long as we maintain the invariant 
mentioned above.
+logWarning(s"Attempting to release consumer that is not in the 
cache")
   }
 }
   }
 
   /**
* Get a cached consumer for groupId, assigned to topic and partition.
* If matching consumer doesn't already exist, will be created using 
kafkaParams.
+   * The returned consumer must be released explicitly using 
[[KafkaDataConsumer.release()]].
+   *
+   * Note: This method guarantees that the consumer returned is not 
currently in use by any one
+   * else. Within this guarantee, this will make a best effort attempt to 
re-use consumers by
+   * caching them and tracking when they are in use.
*/
-  def getOrCreate(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = 
synchronized {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
-// If this is reattempt at running the task, then invalidate cache and 
start with
-// a new consumer
+  def acquire(
+  topicPartition: TopicPartition,
+  kafkaParams: ju.Map[String, Object],
+  useCache: Boolean): KafkaDataConsumer = synchronized {
+val key = new CacheKey(topicPartition, kafkaParams)
+val existingInternalConsumer = cache.get(key)
+
+lazy val newInternalConsumer = new 
InternalKafkaConsumer(topicPartition, kafkaParams)
+
 if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
-  removeKafkaConsumer(topic, partition, kafkaParams)
-  val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
-  consumer.inuse = true
  

[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...

2018-03-10 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20767#discussion_r173636002
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -342,80 +415,103 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 }
   }
 
-  def releaseKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
+  private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = {
 synchronized {
-  val consumer = cache.get(key)
-  if (consumer != null) {
-consumer.inuse = false
-  } else {
-logWarning(s"Attempting to release consumer that does not exist")
-  }
-}
-  }
 
-  /**
-   * Removes (and closes) the Kafka Consumer for the given topic, 
partition and group id.
-   */
-  def removeKafkaConsumer(
-  topic: String,
-  partition: Int,
-  kafkaParams: ju.Map[String, Object]): Unit = {
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-val topicPartition = new TopicPartition(topic, partition)
-val key = CacheKey(groupId, topicPartition)
-
-synchronized {
-  val removedConsumer = cache.remove(key)
-  if (removedConsumer != null) {
-removedConsumer.close()
+  // If it has been marked for close, then do it any way
+  if (intConsumer.inuse && intConsumer.markedForClose) 
intConsumer.close()
--- End diff --

Is it possible we have the following condition - should intConsumer.close() 
be called ?

!intConsumer.inuse && intConsumer.markedForClose


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14568: [SPARK-10868] monotonicallyIncreasingId() support...

2016-08-31 Thread tedyu
Github user tedyu closed the pull request at:

https://github.com/apache/spark/pull/14568


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14568: [SPARK-10868] monotonicallyIncreasingId() supports offse...

2016-08-17 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/14568
  
I don't think so.
Using (id & 8589934591) would obtain the numbers 99 and 199 in my example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14568: [SPARK-10868] monotonicallyIncreasingId() supports offse...

2016-08-17 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/14568
  
Can you elaborate ?

1st run: Id's 1 to 99 are generated.

2nd run: poll Id column and obtain 99. Specify 100 as offset for 
monotonically_increasing_id(). Id's 100 to 199 are generated.

3rd run: poll Id column and obtain 199. Specify 200 as offset for 
monotonically_increasing_id(). Id's 200 to 299 are generated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14568: [SPARK-10868] monotonicallyIncreasingId() supports offse...

2016-08-17 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/14568
  
As Herman commented above, obtaining lower 33 bits of the id column would 
allow Ids generated from two (or more) executions to form contiguous range.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14568: [SPARK-10868] monotonicallyIncreasingId() supports offse...

2016-08-17 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/14568
  
@hvanhovell 
Let me know if there is more I should do for this enhancement.
Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14568: [SPARK-10868] monotonicallyIncreasingId() support...

2016-08-17 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/14568#discussion_r75151883
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -426,6 +426,29 @@ def monotonically_increasing_id():
 return Column(sc._jvm.functions.monotonically_increasing_id())
 
 
+@since(2.1)
+def monotonically_increasing_id_w_offset(offset):
--- End diff --

I was planning to do that.
But the @since() annotation becomes confusing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14568: [SPARK-10868] monotonicallyIncreasingId() supports offse...

2016-08-17 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/14568
  
The addition of offset support allows users to concatenate rows from 
different datasets.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14568: [SPARK-10868] monotonicallyIncreasingId() supports offse...

2016-08-16 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/14568
  
With:
spark.range(0, 9, 1, 3).select(monotonically_increasing_id()).show

I got:
```
+-+
|monotonically_increasing_id()|
+-+
|0|
|1|
|2|
|   8589934592|
|   8589934593|
|   8589934594|
|  17179869184|
|  17179869185|
|  17179869186|
+-+
```
The next offset could be 3.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14568: [SPARK-10868] monotonicallyIncreasingId() supports offse...

2016-08-16 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/14568
  
@hvanhovell 
What do you think of the above reply ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14568: [SPARK-10868] monotonicallyIncreasingId() supports offse...

2016-08-15 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/14568
  
@hvanhovell 
As Martin said in JIRA:

* Add the index column to A' - this time starting at 200, as there are 
already entries with id's from 0 to 199 (here, monotonicallyInreasingID( 200 ) 
is required.)
* union A and A'

Is the above sample good by you ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14568: [SPARK-10868] monotonicallyIncreasingId() supports offse...

2016-08-15 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/14568
  
@rxin 
Can you take a look at the python API one more time ?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14568: [SPARK-10868] monotonicallyIncreasingId() supports offse...

2016-08-14 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/14568
  
```
/home/jenkins/workspace/SparkPullRequestBuilder/dev/mima: line 37: 40498 
Aborted (core dumped) java -XX:MaxPermSize=1g -Xmx2g -cp 
"$TOOLS_CLASSPATH:$OLD_DEPS_CLASSPATH" org.apache.spark.tools.GenerateMIMAIgnore
[error] running /home/jenkins/workspace/SparkPullRequestBuilder/dev/mima 
-Pyarn -Phadoop-2.3 -Pkinesis-asl -Phive-thriftserver -Phive ; received return 
code 134
```
Not sure what caused the core dump.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14568: [SPARK-10868] monotonicallyIncreasingId() supports offse...

2016-08-14 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/14568
  
Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14568: [SPARK-10868] monotonicallyIncreasingId() support...

2016-08-14 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/14568#discussion_r74710318
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -426,6 +426,29 @@ def monotonically_increasing_id():
 return Column(sc._jvm.functions.monotonically_increasing_id())
 
 
+@since(2.1)
+def monotonically_increasing_id(offset):
--- End diff --

Or we can default offset to 0 which covers the current usage.
But I am not sure which version to put in @since()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14568: [SPARK-10868] monotonicallyIncreasingId() support...

2016-08-14 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/14568#discussion_r74710145
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -426,6 +426,29 @@ def monotonically_increasing_id():
 return Column(sc._jvm.functions.monotonically_increasing_id())
 
 
+@since(2.1)
+def monotonically_increasing_id(offset):
--- End diff --

We can introduce a new method which accepts offset.
How about monotonically_increasing_id_w_offset ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14568: [SPARK-10868] monotonicallyIncreasingId() supports offse...

2016-08-14 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/14568
  
@hvanhovell @rxin :
Is there any other comment I should address ?

Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14568: [SPARK-10868] monotonicallyIncreasingId() support...

2016-08-11 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/14568#discussion_r74484517
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
 ---
@@ -81,3 +93,12 @@ case class MonotonicallyIncreasingID() extends 
LeafExpression with Nondeterminis
 
   override def sql: String = s"$prettyName()"
 }
+
+object MonotonicallyIncreasingID {
+  private def parseExpression(expr: Expression): Long = expr match {
+case IntegerLiteral(i) => i.toLong
+case NonNullLiteral(l, LongType) => l.toString.toLong
--- End diff --

After making the change, test still failed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14568: [SPARK-10868] monotonicallyIncreasingId() support...

2016-08-11 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/14568#discussion_r74460557
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
 ---
@@ -81,3 +93,12 @@ case class MonotonicallyIncreasingID() extends 
LeafExpression with Nondeterminis
 
   override def sql: String = s"$prettyName()"
 }
+
+object MonotonicallyIncreasingID {
+  private def parseExpression(expr: Expression): Long = expr match {
+case IntegerLiteral(i) => i.toLong
+case NonNullLiteral(l, LongType) => l.toString.toLong
--- End diff --

```
object NonNullLiteral {
  def unapply(literal: Literal): Option[(Any, DataType)] = {
Option(literal.value).map(_ => (literal.value, literal.dataType))
```
A tuple is returned by the extractor. I don't think l: Long would compile.

If the above doesn't match, I wonder why AnalysisException from the default 
case wasn't thrown.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14568: [SPARK-10868] monotonicallyIncreasingId() supports offse...

2016-08-10 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/14568
  
```
16/08/10 15:35:12 DEBUG HiveSessionState$$anon$1:
=== Result of Batch Resolution ===
!'DeserializeToObject 
unresolveddeserializer(createexternalrow(getcolumnbyordinal(0, LongType), 
StructField(monotonically_increasing_id(),LongType,false))), obj#8: 
org.apache.spark.sql.Row  DeserializeToObject 
createexternalrow(monotonically_increasing_id()#6L, 
StructField(monotonically_increasing_id(),LongType,false)), obj#8: 
org.apache.spark.sql.Row
 +- LocalRelation , [monotonically_increasing_id()#6L]   

   +- LocalRelation , 
[monotonically_increasing_id()#6L]

16/08/10 15:35:12 DEBUG HiveSessionState$$anon$1:
=== Result of Batch Resolution ===
!'DeserializeToObject 
unresolveddeserializer(createexternalrow(getcolumnbyordinal(0, LongType), 
StructField(monotonically_increasing_id(),LongType,false))), obj#10: 
org.apache.spark.sql.Row DeserializeToObject 
createexternalrow(monotonically_increasing_id()#6L, 
StructField(monotonically_increasing_id(),LongType,false)), obj#10: 
org.apache.spark.sql.Row
 +- LocalRelation , [monotonically_increasing_id()#6L]   

+-LocalRelation , 
[monotonically_increasing_id()#6L]

16/08/10 15:35:12 DEBUG package$ExpressionCanonicalizer:
=== Result of Batch CleanExpressions ===
!input[0, int, true] AS value#2   input[0, int, true]
!+- input[0, int, true]

16/08/10 15:35:12 DEBUG package$ExpressionCanonicalizer:
=== Result of Batch CleanExpressions ===
!monotonically_increasing_id(5) AS monotonically_increasing_id()#6L   
monotonically_increasing_id(5)
!+- monotonically_increasing_id(5)
```
I wonder if there is necessary change that I missed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14568: [SPARK-10868] monotonicallyIncreasingId() supports offse...

2016-08-09 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/spark/pull/14568
  
```
[info] - monotonically_increasing_id_with_offset *** FAILED *** (14 
milliseconds)
[info]   org.apache.spark.sql.AnalysisException: Invalid number of 
arguments for function monotonically_increasing_id; line 1 pos 0
[info]   at 
org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$5.apply(FunctionRegistry.scala:457)
[info]   at 
org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$5.apply(FunctionRegistry.scala:443)
```
I wonder why 'monotonically_increasing_id(offset: Long): Column' wasn't 
considered as a match.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14568: [SPARK-10868] monotonicallyIncreasingId() support...

2016-08-09 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/14568#discussion_r74138850
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
 ---
@@ -40,13 +40,14 @@ import org.apache.spark.sql.types.{DataType, LongType}
   represent the record number within each partition. The assumption is 
that the data frame has
   less than 1 billion partitions, and each partition has less than 8 
billion records.""",
   extended = "> SELECT _FUNC_();\n 0")
-case class MonotonicallyIncreasingID() extends LeafExpression with 
Nondeterministic {
+case class MonotonicallyIncreasingID(offset: Long = 0) extends 
LeafExpression
--- End diff --

```
case class HyperLogLogPlusPlus(
child: Expression,
relativeSD: Double = 0.05,
mutableAggBufferOffset: Int = 0,
inputAggBufferOffset: Int = 0)
```
The change seems to be in line with HyperLogLogPlusPlus ctor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14568: SPARK-10868 monotonicallyIncreasingId() supports ...

2016-08-09 Thread tedyu
GitHub user tedyu opened a pull request:

https://github.com/apache/spark/pull/14568

SPARK-10868 monotonicallyIncreasingId() supports offset for indexing

## What changes were proposed in this pull request?

This PR adds offset to monotonicallyIncreasingId()

## How was this patch tested?

Existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tedyu/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/14568.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #14568


commit 4a4e247d519ff388bd056149d584dd77ea086677
Author: tedyu <yuzhih...@gmail.com>
Date:   2016-08-09T20:12:34Z

SPARK-10868 monotonicallyIncreasingId() supports offset for indexing




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13983: [SPARK-16021] Fill freed memory in test to help c...

2016-07-06 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13983#discussion_r69842071
  
--- Diff: 
common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java ---
@@ -58,4 +61,17 @@ public void overlappingCopyMemory() {
   Assert.assertEquals((byte)i, data[i + 1]);
 }
   }
+
+  @Test
+  public void memoryDebugFillEnabledInTest() {
+Assert.assertTrue(MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED);
--- End diff --

This assertion fails in Jenkins.
Did you intend to set the flag to true in a static block at the beginning 
of the test ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13829: [SPARK-16071][SQL] Checks size limit when doublin...

2016-06-30 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13829#discussion_r69213184
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+
+class BufferHolderSuite extends SparkFunSuite {
+
+  test("SPARK-16071 Check the size limit to avoid integer overflow") {
+var e = intercept[UnsupportedOperationException] {
+  new BufferHolder(new UnsafeRow(Int.MaxValue / 8))
+}
+assert(e.getMessage.contains("too many fields"))
--- End diff --

Should this string be defined in BufferHolder and referenced here so that 
the test wouldn't break if the exception message is modified ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13718: [SPARK-16002][SQL]Sleep when no new data arrives ...

2016-06-21 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13718#discussion_r67971887
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -120,7 +120,13 @@ class FileStreamSource(
 val catalog = new ListingFileCatalog(sparkSession, globbedPaths, 
options, Some(new StructType))
 val files = catalog.allFiles().map(_.getPath.toUri.toString)
 val endTime = System.nanoTime
-logInfo(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 
100}ms")
+val listingTimeMs = (endTime.toDouble - startTime) / 100
+if (listingTimeMs > 2000) {
+  // Output a warning when listing files uses more than 2 seconds.
+  logWarning(s"Listed ${files.size} file(s) in $listingTimeMs ms")
--- End diff --

Should some kind of metric be introduced so that the user doesn't need to 
examine logs ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13718: [SPARK-16002][SQL]Sleep when no new data arrives ...

2016-06-21 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13718#discussion_r67970641
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -120,7 +120,13 @@ class FileStreamSource(
 val catalog = new ListingFileCatalog(sparkSession, globbedPaths, 
options, Some(new StructType))
 val files = catalog.allFiles().map(_.getPath.toUri.toString)
 val endTime = System.nanoTime
-logInfo(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 
100}ms")
+val listingTimeMs = (endTime.toDouble - startTime) / 100
+if (listingTimeMs > 2000) {
+  // Output a warning when listing files uses more than 2 seconds.
+  logWarning(s"Listed ${files.size} file(s) in $listingTimeMs ms")
--- End diff --

Should this be at DEBUG level ?

The user wouldn't know what to do seeing these warnings.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13473: [SPARK-15736][CORE] Gracefully handle loss of Dis...

2016-06-02 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13473#discussion_r65649617
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -403,6 +403,17 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Cleanup code run in response to a failed local read.
+   * Must be called while holding a read lock on the block.
+   */
+  private def handleLocalReadFailure(blockId: BlockId): Nothing = {
+releaseLock(blockId)
+// Remove the missing block so that its unavailability is reported to 
the driver
+removeBlock(blockId)
--- End diff --

Looking at BlockInfoManager#lockForWriting(), I think you're right.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13473: [SPARK-15736][CORE] Gracefully handle loss of Dis...

2016-06-02 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13473#discussion_r65648116
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -403,6 +403,17 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Cleanup code run in response to a failed local read.
+   * Must be called while holding a read lock on the block.
+   */
+  private def handleLocalReadFailure(blockId: BlockId): Nothing = {
+releaseLock(blockId)
+// Remove the missing block so that its unavailability is reported to 
the driver
+removeBlock(blockId)
--- End diff --

Should this be called before the releaseLock() call ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13473: [SPARK-15736][CORE] Gracefully handle loss of Dis...

2016-06-02 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13473#discussion_r65648040
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -403,6 +403,17 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Cleanup code run in response to a failed local read.
+   * Must be called while holding a read lock on the block.
+   */
+  private def handleLocalReadFailure(blockId: BlockId): Nothing = {
+releaseLock(blockId)
+// Remove the missing block so that its unavailability is reported to 
the driver
+removeBlock(blockId)
--- End diff --

Should this be called before the releaseLock() call ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13283: [SPARK-15515] [SQL] Error Handling in Running SQL...

2016-06-02 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13283#discussion_r65629823
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala 
---
@@ -38,6 +40,16 @@ private[sql] class ResolveDataSource(sparkSession: 
SparkSession) extends Rule[Lo
   sparkSession,
   paths = u.tableIdentifier.table :: Nil,
   className = u.tableIdentifier.database.get)
+
+val notSupportDirectQuery = try {
+  !classOf[FileFormat].isAssignableFrom(dataSource.providingClass)
+} catch {
+  case NonFatal(e) => false
--- End diff --

Thanks Ryan.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13283: [SPARK-15515] [SQL] Error Handling in Running SQL...

2016-06-02 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13283#discussion_r65622737
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala 
---
@@ -38,6 +40,16 @@ private[sql] class ResolveDataSource(sparkSession: 
SparkSession) extends Rule[Lo
   sparkSession,
   paths = u.tableIdentifier.table :: Nil,
   className = u.tableIdentifier.database.get)
+
+val notSupportDirectQuery = try {
+  !classOf[FileFormat].isAssignableFrom(dataSource.providingClass)
+} catch {
+  case NonFatal(e) => false
--- End diff --

When would this happen ?

Should true be returned here ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15345][SQL][PYSPARK]. SparkSession's co...

2016-05-25 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13160#discussion_r64677095
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
---
@@ -771,7 +777,11 @@ object SparkSession {
 
   val sparkConf = new SparkConf()
   options.foreach { case (k, v) => sparkConf.set(k, v) }
-  SparkContext.getOrCreate(sparkConf)
+  val sc = SparkContext.getOrCreate(sparkConf)
+  // maybe this is an existing SparkContext, update its SparkConf 
which maybe used
--- End diff --

We can check whether the sc was pre-existing, right ?

In that case the foreach below is not needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15431][SQL] Support LIST FILE(s)|JAR(s)...

2016-05-24 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13212#discussion_r64405314
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -774,13 +774,42 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
   }
 
   /**
-   * Create an [[AddJarCommand]] or [[AddFileCommand]] command depending 
on the requested resource.
+   * Create a [[AddFileCommand]], [[AddJarCommand]], [[ListFilesCommand]] 
or [[ListJarsCommand]]
+   * command depending on the requested operation on resources.
+   * Expected format:
+   * {{{
+   *   ADD (FILE[s] [filepath ...] | JAR[s] [jarpath ...])
+   *   DELETE (FILE[s] [filepath ...] | JAR[s] [jarpath ...])
+   *   LIST (FILE[s] [filepath ...] | JAR[s] [jarpath ...])
+   * }}}
*/
-  override def visitAddResource(ctx: AddResourceContext): LogicalPlan = 
withOrigin(ctx) {
-ctx.identifier.getText.toLowerCase match {
-  case "file" => AddFileCommand(remainder(ctx.identifier).trim)
-  case "jar" => AddJarCommand(remainder(ctx.identifier).trim)
-  case other => throw operationNotAllowed(s"ADD with resource type 
'$other'", ctx)
+  override def visitManageResource(ctx: ManageResourceContext): 
LogicalPlan = withOrigin(ctx) {
+val mayebePaths = remainder(ctx.identifier).trim
+ctx.op.getType match {
+  case SqlBaseParser.ADD =>
+ctx.identifier.getText.toLowerCase match {
+  case "file" => AddFileCommand(mayebePaths)
+  case "jar" => AddJarCommand(mayebePaths)
+  case other => throw operationNotAllowed(s"ADD with resource type 
'$other'", ctx)
+}
+  case SqlBaseParser.DELETE =>
+throw operationNotAllowed(s"DELETE resources", ctx)
+  case SqlBaseParser.LIST =>
+ctx.identifier.getText.toLowerCase match {
+  case "files" | "file" =>
+if (mayebePaths.length > 0) {
+  ListFilesCommand(mayebePaths.split("\\s+"))
--- End diff --

mayebePaths -> maybePaths


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11827] [SQL] Adding java.math.BigIntege...

2016-05-21 Thread tedyu
Github user tedyu commented on the pull request:

https://github.com/apache/spark/pull/10125#issuecomment-220772184
  
When would the addendum be checked in ?

For people using Java 7, it is inconvenient because they have to modify 
Decimal.scala otherwise the compilation would fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15273] YarnSparkHadoopUtil#getOutOfMemo...

2016-05-20 Thread tedyu
Github user tedyu commented on the pull request:

https://github.com/apache/spark/pull/13057#issuecomment-220734121
  
@srowen :
Is this ready to go in ?

Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11827] [SQL] Adding java.math.BigIntege...

2016-05-20 Thread tedyu
Github user tedyu commented on the pull request:

https://github.com/apache/spark/pull/10125#issuecomment-220720146
  
See #13233


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11827] [SQL] Adding java.math.BigIntege...

2016-05-20 Thread tedyu
Github user tedyu commented on the pull request:

https://github.com/apache/spark/pull/10125#issuecomment-220716220
  
Looks like bigintval.longValue() should have been used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11827] [SQL] Adding java.math.BigIntege...

2016-05-20 Thread tedyu
Github user tedyu commented on the pull request:

https://github.com/apache/spark/pull/10125#issuecomment-22071
  
This seems to have broken build for Java 7:
```
sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala:137: 
value longValueExact is not a member of java.math.BigInteger
[ERROR]   this.longVal = bigintval.longValueExact()
[ERROR]^
[ERROR] one error found
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15273] YarnSparkHadoopUtil#getOutOfMemo...

2016-05-20 Thread tedyu
Github user tedyu commented on the pull request:

https://github.com/apache/spark/pull/13057#issuecomment-220588258
  
@srowen 
Gentle ping.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15273] YarnSparkHadoopUtil#getOutOfMemo...

2016-05-19 Thread tedyu
Github user tedyu commented on the pull request:

https://github.com/apache/spark/pull/13057#issuecomment-220256566
  
@srowen 
See if I have addressed all your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15273] YarnSparkHadoopUtil#getOutOfMemo...

2016-05-17 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13057#discussion_r63563275
  
--- Diff: 
launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java ---
@@ -334,6 +334,18 @@ static void addPermGenSizeOpt(List cmd) {
   }
 
   /**
+   * Gets the OutOfMemoryError option for Spark if the user hasn't set it.
+   */
+  public static void addOutOfMemoryErrorArgument(List cmd) {
--- End diff --

Can you take a look at my initial attempt ?
```
+  val = onOOME = javaOpts.find(x => 
x.contains("-XX:OnOutOfMemoryError"))
+  if (onOOME == None) {
+"-XX:OnOutOfMemoryError='kill %p'"
+  } else {
+""
+  }
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15273] YarnSparkHadoopUtil#getOutOfMemo...

2016-05-17 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13057#discussion_r63560321
  
--- Diff: 
launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java ---
@@ -334,6 +334,18 @@ static void addPermGenSizeOpt(List cmd) {
   }
 
   /**
+   * Gets the OutOfMemoryError option for Spark if the user hasn't set it.
+   */
+  public static void addOutOfMemoryErrorArgument(List cmd) {
--- End diff --

YarnSparkHadoopUtil is written in Scala while this method is in Java.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15273] YarnSparkHadoopUtil#getOutOfMemo...

2016-05-17 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13057#discussion_r63550916
  
--- Diff: 
launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java ---
@@ -334,6 +334,18 @@ static void addPermGenSizeOpt(List cmd) {
   }
 
   /**
+   * Gets the OutOfMemoryError option for Spark if the user hasn't set it.
+   */
+  public static void addOutOfMemoryErrorArgument(List cmd) {
--- End diff --

Please suggest a suitable class which is better host for this Java method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15112][SQL] Allows query plan schema an...

2016-05-16 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/12952#discussion_r63439615
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -163,15 +164,17 @@ object EliminateSerialization extends 
Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
 case d @ DeserializeToObject(_, _, s: SerializeFromObject)
 if d.outputObjectType == s.inputObjectType =>
-  // A workaround for SPARK-14803. Remove this after it is fixed.
-  if (d.outputObjectType.isInstanceOf[ObjectType] &&
-  d.outputObjectType.asInstanceOf[ObjectType].cls == 
classOf[org.apache.spark.sql.Row]) {
-s.child
-  } else {
-// Adds an extra Project here, to preserve the output expr id of 
`DeserializeToObject`.
-val objAttr = Alias(s.child.output.head, "obj")(exprId = 
d.output.head.exprId)
-Project(objAttr :: Nil, s.child)
+  d.outputObjectType match {
+// A workaround for SPARK-14803. Remove this after it is fixed.
+case ObjectType(cls) if cls == classOf[Row] =>
--- End diff --

Now that SPARK-14803 is fixed, this can be dropped.

Should I open a PR ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15273] YarnSparkHadoopUtil#getOutOfMemo...

2016-05-16 Thread tedyu
Github user tedyu commented on the pull request:

https://github.com/apache/spark/pull/13057#issuecomment-219532924
  
@srowen 
Pardon for the ping.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15273] YarnSparkHadoopUtil#getOutOfMemo...

2016-05-15 Thread tedyu
Github user tedyu commented on the pull request:

https://github.com/apache/spark/pull/13057#issuecomment-219308231
  
@srowen 
Gentle ping.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15273] YarnSparkHadoopUtil#getOutOfMemo...

2016-05-13 Thread tedyu
Github user tedyu commented on the pull request:

https://github.com/apache/spark/pull/13057#issuecomment-219050166
  
@srowen 
I think I have addressed your comments.

Cheers


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15273] YarnSparkHadoopUtil#getOutOfMemo...

2016-05-13 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13057#discussion_r63156623
  
--- Diff: 
launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java ---
@@ -334,6 +334,18 @@ static void addPermGenSizeOpt(List cmd) {
   }
 
   /**
+   * Gets the OutOfMemoryError option for Spark if the user hasn't set it.
+   */
+  public static String getOutOfMemoryErrorArgument(List cmd) {
--- End diff --

See the code in ExecutorRunnable.scala, around line 214:
```
val commands = prefixEnv ++ Seq(
  YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + 
"/bin/java",
  "-server",
...
  YarnSparkHadoopUtil.getOutOfMemoryErrorArgument(javaOpts)) ++
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15273] YarnSparkHadoopUtil#getOutOfMemo...

2016-05-13 Thread tedyu
Github user tedyu commented on the pull request:

https://github.com/apache/spark/pull/13057#issuecomment-218991503
  
@srowen 
Mind taking another look ?

Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15273] YarnSparkHadoopUtil#getOutOfMemo...

2016-05-12 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13057#discussion_r63044248
  
--- Diff: 
launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java ---
@@ -334,6 +334,18 @@ static void addPermGenSizeOpt(List cmd) {
   }
 
   /**
+   * Gets the OutOfMemoryError option for Spark if the user hasn't set it.
+   */
+  public static String getOutOfMemoryErrorArgument(List cmd) {
--- End diff --

The pattern ExecutorRunnable uses is that the return value is added 
explicitly to Java opts.

I think being consistent with the pattern is fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15273] YarnSparkHadoopUtil#getOutOfMemo...

2016-05-12 Thread tedyu
Github user tedyu commented on the pull request:

https://github.com/apache/spark/pull/13057#issuecomment-218781776
  
@srowen 
Can you take another look ?

Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: YarnSparkHadoopUtil#getOutOfMemoryErrorArgumen...

2016-05-11 Thread tedyu
GitHub user tedyu opened a pull request:

https://github.com/apache/spark/pull/13057

YarnSparkHadoopUtil#getOutOfMemoryErrorArgument should respect 
OnOutOfMemoryError parameter given by user

## What changes were proposed in this pull request?

As Nirav reported in this thread:
http://search-hadoop.com/m/q3RTtdF3yNLMd7u

YarnSparkHadoopUtil#getOutOfMemoryErrorArgument previously specified 'kill 
%p' unconditionally.
We should respect the parameter given by user.

## How was this patch tested?

Existing tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tedyu/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/13057.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #13057






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14962][SQL] Do not push down isnotnull/...

2016-05-06 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/12777#discussion_r62393964
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala ---
@@ -56,29 +55,35 @@ import org.apache.spark.sql.sources._
  * known to be convertible.
  */
 private[orc] object OrcFilters extends Logging {
-  def createFilter(filters: Array[Filter]): Option[SearchArgument] = {
+  def createFilter(schema: StructType, filters: Array[Filter]): 
Option[SearchArgument] = {
+val dataTypeMap = schema.map(f => f.name -> f.dataType).toMap
+
 // First, tries to convert each filter individually to see whether 
it's convertible, and then
 // collect all convertible ones to build the final `SearchArgument`.
 val convertibleFilters = for {
   filter <- filters
-  _ <- buildSearchArgument(filter, SearchArgumentFactory.newBuilder())
+  _ <- buildSearchArgument(dataTypeMap, filter, 
SearchArgumentFactory.newBuilder())
 } yield filter
 
 for {
   // Combines all convertible filters using `And` to produce a single 
conjunction
   conjunction <- convertibleFilters.reduceOption(And)
   // Then tries to build a single ORC `SearchArgument` for the 
conjunction predicate
-  builder <- buildSearchArgument(conjunction, 
SearchArgumentFactory.newBuilder())
+  builder <- buildSearchArgument(dataTypeMap, conjunction, 
SearchArgumentFactory.newBuilder())
 } yield builder.build()
   }
 
-  private def buildSearchArgument(expression: Filter, builder: Builder): 
Option[Builder] = {
+  private def buildSearchArgument(
+  dataTypeMap: Map[String, DataType],
+  expression: Filter,
+  builder: Builder): Option[Builder] = {
 def newBuilder = SearchArgumentFactory.newBuilder()
 
-def isSearchableLiteral(value: Any): Boolean = value match {
-  // These are types recognized by the 
`SearchArgumentImpl.BuilderImpl.boxLiteral()` method.
-  case _: String | _: Long | _: Double | _: Byte | _: Short | _: 
Integer | _: Float => true
-  case _: DateWritable | _: HiveDecimal | _: HiveChar | _: HiveVarchar 
=> true
+def isSearchableType(dataType: DataType): Boolean = dataType match {
+  // Only the values in the Spark types below can be recognized by
+  // the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method.
+  case ByteType | ShortType | FloatType | DoubleType => true
--- End diff --

What about BooleanType ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13120] [test-maven] Shade protobuf-java

2016-05-06 Thread tedyu
Github user tedyu commented on the pull request:

https://github.com/apache/spark/pull/10995#issuecomment-217514735
  
@zsxwing @JoshRosen @srowen 
Mind taking another look ?

Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15135][SQL]Make sure SparkSession threa...

2016-05-05 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/12915#discussion_r62265808
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 ---
@@ -330,19 +334,21 @@ class SessionCatalog(
* the same name, then, if that does not exist, return the table from 
the current database.
*/
   def lookupRelation(name: TableIdentifier, alias: Option[String] = None): 
LogicalPlan = {
-val db = name.database.getOrElse(currentDb)
-val table = formatTableName(name.table)
-val relation =
-  if (name.database.isDefined || !tempTables.contains(table)) {
-val metadata = externalCatalog.getTable(db, table)
-SimpleCatalogRelation(db, metadata, alias)
-  } else {
-tempTables(table)
-  }
-val qualifiedTable = SubqueryAlias(table, relation)
-// If an alias was specified by the lookup, wrap the plan in a 
subquery so that
-// attributes are properly qualified with this alias.
-alias.map(a => SubqueryAlias(a, 
qualifiedTable)).getOrElse(qualifiedTable)
+synchronized {
+  val db = name.database.getOrElse(currentDb)
+  val table = formatTableName(name.table)
+  val relation =
+if (name.database.isDefined || !tempTables.contains(table)) {
+  val metadata = externalCatalog.getTable(db, table)
+  SimpleCatalogRelation(db, metadata, alias)
+} else {
+  tempTables(table)
+}
+  val qualifiedTable = SubqueryAlias(table, relation)
--- End diff --

nit: look like this call and alias.map() can be outside the synchronized 
block.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15052][SQL] Use builder pattern to crea...

2016-05-03 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/12830#discussion_r61987501
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
---
@@ -635,6 +642,122 @@ class SparkSession private(
 
 object SparkSession {
 
+  /**
+   * Builder for [[SparkSession]].
+   */
+  class Builder {
--- End diff --

How about adding a clear() method so that Builder instance can be reused ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15087][CORE][SQL] Remove AccumulatorV2....

2016-05-03 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/12865#discussion_r61946906
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -394,7 +394,7 @@ private[spark] class TaskSchedulerImpl(
 // deserialized.  This brings trouble to the accumulator 
framework, which depends on
 // serialization to set the `atDriverSide` flag.  Here we call 
`acc.localValue` instead to
--- End diff --

This no longer applies


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14850] Show limit for array size when a...

2016-05-03 Thread tedyu
Github user tedyu closed the pull request at:

https://github.com/apache/spark/pull/12814


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-15052][SQL] Use builder pattern to crea...

2016-05-02 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/12830#discussion_r61821150
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
---
@@ -635,6 +642,122 @@ class SparkSession private(
 
 object SparkSession {
 
+  /**
+   * Builder for [[SparkSession]].
+   */
+  class Builder {
+
+private[this] val options = new 
scala.collection.mutable.HashMap[String, String]
+
+/**
+ * Sets a name for the application, which will be shown in the Spark 
web UI.
+ *
+ * @since 2.0.0
+ */
+def appName(name: String): Builder = config("spark.app.name", name)
+
+/**
+ * Sets a config option. Options set using this method are 
automatically propagated to
+ * both [[SparkConf]] and SparkSession's own configuration.
+ *
+ * @since 2.0.0
+ */
+def config(key: String, value: String): Builder = synchronized {
+  options += key -> value
+  this
+}
+
+/**
+ * Sets a config option. Options set using this method are 
automatically propagated to
+ * both [[SparkConf]] and SparkSession's own configuration.
+ *
+ * @since 2.0.0
+ */
+def config(key: String, value: Long): Builder = synchronized {
--- End diff --

What about other primitive types for the value: Int, Float, Short ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14850] Show limit for array size when a...

2016-04-30 Thread tedyu
Github user tedyu commented on the pull request:

https://github.com/apache/spark/pull/12814#issuecomment-215974354
  
```
sbt.ForkMain$ForkError: java.lang.AssertionError: 
expected:<0.9986422261219262> but was:<0.9986422261219272>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:118)
at org.junit.Assert.assertEquals(Assert.java:144)
at 
org.apache.spark.mllib.stat.JavaStatisticsSuite.testCorr(JavaStatisticsSuite.java:75)
```
The above assertion failure is not related to the change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14850] Show limit for array size when a...

2016-04-30 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/12814#discussion_r61670775
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
 ---
@@ -338,9 +338,10 @@ public UnsafeArrayData copy() {
   }
 
   public static UnsafeArrayData fromPrimitiveArray(int[] arr) {
-if (arr.length > (Integer.MAX_VALUE - 4) / 8) {
+int limit = (Integer.MAX_VALUE - 4) / 8;
+if (arr.length > limit) {
   throw new UnsupportedOperationException("Cannot convert this array 
to unsafe format as " +
-"it's too big.");
+"its length (" + arr.length + ") exceeds " + limit + ".");
--- End diff --

This is Java.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14850] Show limit for array size when a...

2016-04-30 Thread tedyu
GitHub user tedyu opened a pull request:

https://github.com/apache/spark/pull/12814

[SPARK-14850] Show limit for array size when array is too big

## What changes were proposed in this pull request?

This PR shows the size of array and the limit when array is too big.

## How was this patch tested?

Existing tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tedyu/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/12814.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #12814


commit ed47ef9994bbdd23aa14584da19e20483a35b1f2
Author: tedyu <yuzhih...@gmail.com>
Date:   2016-04-30T14:42:55Z

[SPARK-14850] Show limit for array size when array is too big




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   >