[jira] [Commented] (SAMZA-1712) Tear down of connections in ZkClient on interrupts.

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/SAMZA-1712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16481381#comment-16481381
 ] 

ASF GitHub Bot commented on SAMZA-1712:
---

Github user asfgit closed the pull request at:

https://github.com/apache/samza/pull/519


> Tear down of connections in ZkClient on interrupts.
> ---
>
> Key: SAMZA-1712
> URL: https://issues.apache.org/jira/browse/SAMZA-1712
> Project: Samza
>  Issue Type: Bug
>Reporter: Shanthoosh Venkataraman
>Assignee: Shanthoosh Venkataraman
>Priority: Major
>
> If a thread executing zkClient.close is interrupted, currently we swallow the 
> ZkInterruptedException and proceed without closing the zookeeper connection. 
> This leads to ephemeral nodes of StreamProcessor lurking around in zookeeper 
> after StreamProcessor shutdown.
> Users had to wait till zookeeper server session timeout for the ephemeral 
> nodes to get deleted. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


samza git commit: SAMZA-1712: Tear down ZookeeperServer connections in ZkClient on interrupts.

2018-05-18 Thread jagadish
Repository: samza
Updated Branches:
  refs/heads/master 7a2b4650c -> 72ad7523f


SAMZA-1712: Tear down ZookeeperServer connections in ZkClient on interrupts.

**Problem:**

If a thread executing zkClient.close is interrupted, currently we swallow the 
ZkInterruptedException and proceed without closing the zookeeper connection.

This leads to ephemeral nodes of StreamProcessor lurking around in zookeeper 
after StreamProcessor shutdown.

Users had to wait till zookeeper server session timeout for the ephemeral nodes 
to get deleted.

**Change:**

Retry once on InterruptedException when closing the zkClient.

Misc changes:
* Remove unnecessary null checks.
* Remove unnecessary typecasts.

Author: Shanthoosh Venkataraman 

Reviewers: Jagadish 

Closes #519 from shanthoosh/handle_interrupted_exception_in_zkclient_close


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/72ad7523
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/72ad7523
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/72ad7523

Branch: refs/heads/master
Commit: 72ad7523fffdcafdc01a0c6922fc94ccd1e482a5
Parents: 7a2b465
Author: Shanthoosh Venkataraman 
Authored: Fri May 18 17:29:00 2018 -0700
Committer: Jagadish 
Committed: Fri May 18 17:29:00 2018 -0700

--
 .../main/java/org/apache/samza/zk/ZkUtils.java  | 42 ++
 .../org/apache/samza/zk/ZkUtilsMetrics.java |  6 ++
 .../java/org/apache/samza/zk/TestZkUtils.java   | 59 +---
 3 files changed, 73 insertions(+), 34 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/72ad7523/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
--
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 43f7d9c..6511603 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -78,7 +78,6 @@ public class ZkUtils {
   private static final Logger LOG = LoggerFactory.getLogger(ZkUtils.class);
   /* package private */static final String ZK_PROTOCOL_VERSION = "1.0";
 
-
   private final ZkClient zkClient;
   private volatile String ephemeralPath = null;
   private final ZkKeyBuilder keyBuilder;
@@ -105,9 +104,7 @@ public class ZkUtils {
   public void connect() throws ZkInterruptedException {
 boolean isConnected = zkClient.waitUntilConnected(connectionTimeoutMs, 
TimeUnit.MILLISECONDS);
 if (!isConnected) {
-  if (metrics != null) {
-metrics.zkConnectionError.inc();
-  }
+  metrics.zkConnectionError.inc();
   throw new RuntimeException("Unable to connect to Zookeeper within 
connectionTimeout " + connectionTimeoutMs + "ms. Shutting down!");
 }
   }
@@ -144,6 +141,7 @@ public class ZkUtils {
   if (!isValidRegisteredProcessor(processorNode)) {
 LOG.info("Processor: {} is duplicate. Deleting zookeeper node at path: 
{}.", processorId, ephemeralPath);
 zkClient.delete(ephemeralPath);
+metrics.deletions.inc();
 throw new SamzaException(String.format("Processor: %s is duplicate in 
the group. Registration failed.", processorId));
   }
 } else {
@@ -272,16 +270,12 @@ public class ZkUtils {
 
   public void subscribeDataChanges(String path, IZkDataListener dataListener) {
 zkClient.subscribeDataChanges(path, dataListener);
-if (metrics != null) {
-  metrics.subscriptions.inc();
-}
+metrics.subscriptions.inc();
   }
 
   public void subscribeChildChanges(String path, IZkChildListener listener) {
 zkClient.subscribeChildChanges(path, listener);
-if (metrics != null) {
-  metrics.subscriptions.inc();
-}
+metrics.subscriptions.inc();
   }
 
   public void unsubscribeChildChanges(String path, IZkChildListener 
childListener) {
@@ -290,9 +284,7 @@ public class ZkUtils {
 
   public void writeData(String path, Object object) {
 zkClient.writeData(path, object);
-if (metrics != null) {
-  metrics.writes.inc();
-}
+metrics.writes.inc();
   }
 
   public boolean exists(String path) {
@@ -303,9 +295,10 @@ public class ZkUtils {
 try {
   zkClient.close();
 } catch (ZkInterruptedException e) {
-  // Swallowing due to occurrence in the last stage of lifecycle (Not 
actionable) and clear the interrupted status.
+  LOG.warn("Interrupted when closing zkClient. Clearing the interrupted 
status and retrying.", e);
   Thread.interrupted();
-  LOG.warn("Ignoring the exception when closing the zookeeper client.", e);
+  zkClient.close();
+  Thread.currentThread().interrupt();
  

[jira] [Resolved] (SAMZA-1508) JobRunner should not return success until the job is healthy

2018-05-18 Thread Jake Maes (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jake Maes resolved SAMZA-1508.
--
Resolution: Fixed

> JobRunner should not return success until the job is healthy
> 
>
> Key: SAMZA-1508
> URL: https://issues.apache.org/jira/browse/SAMZA-1508
> Project: Samza
>  Issue Type: Bug
>Reporter: Jake Maes
>Assignee: Jake Maes
>Priority: Major
> Fix For: 0.15.0
>
>
> It can be frustrating for users when run-app.sh returns success before the 
> job was fully running.
> This happens because the JobRunner currently waits for JobStatus=RUNNING, but 
> in Yarn for example, that happens when the AM is launched, not when all the 
> containers are launched.
> What can go wrong?
> 1. The job could stay stuck waiting for containers that it cant get because 
> of capacity issues or an outage.
> 2. The job containers may immediately fail due to a runtime error.
> In both cases, the user may go on their merry way because run-app.sh returned 
> successfully, even though the job is already dead. They may not get alerted 
> for some time.
> How do we fix?
> There are a few ways to fix it. Each one progressively harder but 
> progressively better:
> 1. Make JobRunner reach out to AM and monitor the needed containers metric 
> until it reaches 0
> 2. Expose a new healthy endpoint in the AM which is only set to true when a 
> heartbeat has been received from each of the containers. Have the JobRunner 
> wait on this (with a timeout)
> 3. Expose a hook where users can write custom logic to determine job health
> I think #1 is the most bang for buck and the implementation for #1 can easily 
> be extended for #2 later.
> Other notes:
> I don't think this is needed for standalone, since users are directly 
> deploying the processors and can monitor the processes directly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (SAMZA-1508) JobRunner should not return success until the job is healthy

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/SAMZA-1508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16481213#comment-16481213
 ] 

ASF GitHub Bot commented on SAMZA-1508:
---

Github user asfgit closed the pull request at:

https://github.com/apache/samza/pull/367


> JobRunner should not return success until the job is healthy
> 
>
> Key: SAMZA-1508
> URL: https://issues.apache.org/jira/browse/SAMZA-1508
> Project: Samza
>  Issue Type: Bug
>Reporter: Jake Maes
>Assignee: Jake Maes
>Priority: Major
> Fix For: 0.15.0
>
>
> It can be frustrating for users when run-app.sh returns success before the 
> job was fully running.
> This happens because the JobRunner currently waits for JobStatus=RUNNING, but 
> in Yarn for example, that happens when the AM is launched, not when all the 
> containers are launched.
> What can go wrong?
> 1. The job could stay stuck waiting for containers that it cant get because 
> of capacity issues or an outage.
> 2. The job containers may immediately fail due to a runtime error.
> In both cases, the user may go on their merry way because run-app.sh returned 
> successfully, even though the job is already dead. They may not get alerted 
> for some time.
> How do we fix?
> There are a few ways to fix it. Each one progressively harder but 
> progressively better:
> 1. Make JobRunner reach out to AM and monitor the needed containers metric 
> until it reaches 0
> 2. Expose a new healthy endpoint in the AM which is only set to true when a 
> heartbeat has been received from each of the containers. Have the JobRunner 
> wait on this (with a timeout)
> 3. Expose a hook where users can write custom logic to determine job health
> I think #1 is the most bang for buck and the implementation for #1 can easily 
> be extended for #2 later.
> Other notes:
> I don't think this is needed for standalone, since users are directly 
> deploying the processors and can monitor the processes directly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


samza git commit: SAMZA-1508: JobRunner should not return success until the job is healthy

2018-05-18 Thread jmakes
Repository: samza
Updated Branches:
  refs/heads/master 171793b69 -> 7a2b4650c


SAMZA-1508: JobRunner should not return success until the job is healthy

Author: Jacob Maes 
Author: Jacob Maes 

Reviewers: Prateek Maheshwari 

Closes #367 from jmakes/samza-1508


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7a2b4650
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7a2b4650
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7a2b4650

Branch: refs/heads/master
Commit: 7a2b4650cd7c87f0475bc06d306cd4ef834377b0
Parents: 171793b
Author: Jacob Maes 
Authored: Fri May 18 14:23:07 2018 -0700
Committer: Jacob Maes <--global>
Committed: Fri May 18 14:23:07 2018 -0700

--
 build.gradle|   1 +
 .../scala/org/apache/samza/job/JobRunner.scala  |  38 +--
 .../webapp/ApplicationMasterRestClient.java | 111 +++
 .../apache/samza/job/yarn/ClientHelper.scala|  54 ++-
 .../org/apache/samza/job/yarn/YarnJob.scala |  10 +-
 .../webapp/ApplicationMasterRestServlet.scala   |  76 +++--
 .../webapp/TestApplicationMasterRestClient.java | 330 +++
 .../samza/job/yarn/TestClientHelper.scala   |  36 +-
 8 files changed, 583 insertions(+), 73 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/7a2b4650/build.gradle
--
diff --git a/build.gradle b/build.gradle
index 2f27a03..0b4dae5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -448,6 +448,7 @@ project(":samza-yarn_$scalaVersion") {
 compile "org.scala-lang:scala-compiler:$scalaLibVersion"
 compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
 compile "commons-httpclient:commons-httpclient:$commonsHttpClientVersion"
+compile "org.apache.httpcomponents:httpclient:$httpClientVersion"
 compile("org.apache.hadoop:hadoop-yarn-api:$yarnVersion") {
   exclude module: 'slf4j-log4j12'
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/7a2b4650/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
--
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 917c018..c6e14f2 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -20,6 +20,8 @@
 package org.apache.samza.job
 
 
+import java.util.concurrent.TimeUnit
+
 import org.apache.samza.SamzaException
 import org.apache.samza.config.Config
 import org.apache.samza.config.JobConfig.Config2Job
@@ -117,23 +119,11 @@ class JobRunner(config: Config) extends Logging {
 coordinatorSystemProducer.stop()
 
 // Create the actual job, and submit it.
-val job = jobFactory.getJob(config).submit
-
-info("waiting for job to start")
-
-// Wait until the job has started, then exit.
-Option(job.waitForStatus(Running, 500)) match {
-  case Some(appStatus) => {
-if (Running.equals(appStatus)) {
-  info("job started successfully - " + appStatus)
-} else {
-  warn("unable to start job successfully. job has status %s" format 
(appStatus))
-}
-  }
-  case _ => warn("unable to start job successfully.")
-}
+val job = jobFactory.getJob(config)
+
+job.submit()
 
-info("exiting")
+info("Job submitted. Check status to determine when it is running.")
 job
   }
 
@@ -143,21 +133,7 @@ class JobRunner(config: Config) extends Logging {
 // Create the actual job, and kill it.
 val job = jobFactory.getJob(config).kill()
 
-info("waiting for job to terminate")
-
-// Wait until the job has terminated, then exit.
-Option(job.waitForFinish(5000)) match {
-  case Some(appStatus) => {
-if (SuccessfulFinish.equals(appStatus)) {
-  info("job terminated successfully - " + appStatus)
-} else {
-  warn("unable to terminate job successfully. job has status %s" 
format (appStatus))
-}
-  }
-  case _ => warn("unable to terminate job successfully.")
-}
-
-info("exiting")
+info("Kill command executed. Check status to determine when it is 
terminated.")
   }
 
   def status(): ApplicationStatus = {

http://git-wip-us.apache.org/repos/asf/samza/blob/7a2b4650/samza-yarn/src/main/java/org/apache/samza/webapp/ApplicationMasterRestClient.java
--
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/webapp/ApplicationMasterRestClient.java
 

[samza] Git Push Summary

2018-05-18 Thread xinyu
Repository: samza
Updated Tags:  refs/tags/release-0.14.1-rc2 [created] c84935cac


samza git commit: Implementing the fetchSinkInfo in ConfigBasedIOResolver

2018-05-18 Thread xinyu
Repository: samza
Updated Branches:
  refs/heads/0.14.1 69e63d815 -> 9bc03f7ab


Implementing the fetchSinkInfo in ConfigBasedIOResolver

1. I think we missed implementing the fetchSinkInfo method in the 
ConfigBasedResolver when the API was introduced which is breaking the samza sql 
console tool. This fixes it.
2. latest release of mac removed realpath so the command line tools are broken. 
Removed the usage of realpath to fix these tools.

Thanks to nickpan47 for identifying these problems.

Author: Srinivasulu Punuru 

Reviewers: Yi Pan 

Closes #528 from srinipunuru/release-fix.1

(cherry picked from commit 171793b69b33081fc6277c9505b3055f79fcb4b7)
Signed-off-by: xiliu 


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9bc03f7a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9bc03f7a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9bc03f7a

Branch: refs/heads/0.14.1
Commit: 9bc03f7ab32199a15af157cc515d897917694cb5
Parents: 69e63d8
Author: Srinivasulu Punuru 
Authored: Fri May 18 12:31:44 2018 -0700
Committer: xiliu 
Committed: Fri May 18 12:31:57 2018 -0700

--
 .../samza/sql/impl/ConfigBasedIOResolverFactory.java   | 13 -
 samza-tools/scripts/eh-consumer.sh |  2 +-
 samza-tools/scripts/generate-kafka-events.sh   |  2 +-
 samza-tools/scripts/samza-sql-console.sh   |  2 +-
 4 files changed, 11 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/9bc03f7a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
--
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
index 0887dc4..c604e71 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
@@ -19,7 +19,6 @@
 
 package org.apache.samza.sql.impl;
 
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.TableDescriptor;
@@ -62,7 +61,11 @@ public class ConfigBasedIOResolverFactory implements 
SqlIOResolverFactory {
 
 @Override
 public SqlIOConfig fetchSourceInfo(String source) {
-  String[] sourceComponents = source.split("\\.");
+  return fetchSystemInfo(source);
+}
+
+private SqlIOConfig fetchSystemInfo(String name) {
+  String[] sourceComponents = name.split("\\.");
   boolean isTable = isTable(sourceComponents);
 
   // This source resolver expects sources of format 
{systemName}.{streamName}[.$table]
@@ -86,7 +89,7 @@ public class ConfigBasedIOResolverFactory implements 
SqlIOResolverFactory {
   }
 
   if (invalidQuery) {
-String msg = String.format("Source %s is not of the format 
{systemName}.{streamName}[.%s]", source,
+String msg = String.format("Source %s is not of the format 
{systemName}.{streamName}[.%s]", name,
 SAMZA_SQL_QUERY_TABLE_KEYWORD);
 LOG.error(msg);
 throw new SamzaException(msg);
@@ -97,7 +100,7 @@ public class ConfigBasedIOResolverFactory implements 
SqlIOResolverFactory {
 
   TableDescriptor tableDescriptor = null;
   if (isTable) {
-tableDescriptor = new RocksDbTableDescriptor("InputTable-" + source)
+tableDescriptor = new RocksDbTableDescriptor("InputTable-" + name)
 .withSerde(KVSerde.of(
 new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
 new JsonSerdeV2<>(SamzaSqlRelMessage.class)));
@@ -108,7 +111,7 @@ public class ConfigBasedIOResolverFactory implements 
SqlIOResolverFactory {
 
 @Override
 public SqlIOConfig fetchSinkInfo(String sink) {
-  throw new NotImplementedException("No sink support in 
ConfigBasedIOResolver.");
+  return fetchSystemInfo(sink);
 }
 
 private boolean isTable(String[] sourceComponents) {

http://git-wip-us.apache.org/repos/asf/samza/blob/9bc03f7a/samza-tools/scripts/eh-consumer.sh
--
diff --git a/samza-tools/scripts/eh-consumer.sh 
b/samza-tools/scripts/eh-consumer.sh
index 363e028..382e05a 100755
--- a/samza-tools/scripts/eh-consumer.sh
+++ b/samza-tools/scripts/eh-consumer.sh
@@ -20,7 +20,7 @@ if [ `uname` == 'Linux' ];
 then
   base_dir=$(readlink -f $(dirname $0))
 else
-  base_dir=$(realpath $(dirname $0))
+  base_dir=$(dirname $0)
 fi
 
 if [ "x$LOG4J_OPTS" = "x" ]; then


samza git commit: Implementing the fetchSinkInfo in ConfigBasedIOResolver

2018-05-18 Thread xinyu
Repository: samza
Updated Branches:
  refs/heads/master 3b9e14be3 -> 171793b69


Implementing the fetchSinkInfo in ConfigBasedIOResolver

1. I think we missed implementing the fetchSinkInfo method in the 
ConfigBasedResolver when the API was introduced which is breaking the samza sql 
console tool. This fixes it.
2. latest release of mac removed realpath so the command line tools are broken. 
Removed the usage of realpath to fix these tools.

Thanks to nickpan47 for identifying these problems.

Author: Srinivasulu Punuru 

Reviewers: Yi Pan 

Closes #528 from srinipunuru/release-fix.1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/171793b6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/171793b6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/171793b6

Branch: refs/heads/master
Commit: 171793b69b33081fc6277c9505b3055f79fcb4b7
Parents: 3b9e14b
Author: Srinivasulu Punuru 
Authored: Fri May 18 12:31:44 2018 -0700
Committer: xiliu 
Committed: Fri May 18 12:31:44 2018 -0700

--
 .../samza/sql/impl/ConfigBasedIOResolverFactory.java   | 13 -
 samza-tools/scripts/eh-consumer.sh |  2 +-
 samza-tools/scripts/generate-kafka-events.sh   |  2 +-
 samza-tools/scripts/samza-sql-console.sh   |  2 +-
 4 files changed, 11 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/171793b6/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
--
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
index 0887dc4..c604e71 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
@@ -19,7 +19,6 @@
 
 package org.apache.samza.sql.impl;
 
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.TableDescriptor;
@@ -62,7 +61,11 @@ public class ConfigBasedIOResolverFactory implements 
SqlIOResolverFactory {
 
 @Override
 public SqlIOConfig fetchSourceInfo(String source) {
-  String[] sourceComponents = source.split("\\.");
+  return fetchSystemInfo(source);
+}
+
+private SqlIOConfig fetchSystemInfo(String name) {
+  String[] sourceComponents = name.split("\\.");
   boolean isTable = isTable(sourceComponents);
 
   // This source resolver expects sources of format 
{systemName}.{streamName}[.$table]
@@ -86,7 +89,7 @@ public class ConfigBasedIOResolverFactory implements 
SqlIOResolverFactory {
   }
 
   if (invalidQuery) {
-String msg = String.format("Source %s is not of the format 
{systemName}.{streamName}[.%s]", source,
+String msg = String.format("Source %s is not of the format 
{systemName}.{streamName}[.%s]", name,
 SAMZA_SQL_QUERY_TABLE_KEYWORD);
 LOG.error(msg);
 throw new SamzaException(msg);
@@ -97,7 +100,7 @@ public class ConfigBasedIOResolverFactory implements 
SqlIOResolverFactory {
 
   TableDescriptor tableDescriptor = null;
   if (isTable) {
-tableDescriptor = new RocksDbTableDescriptor("InputTable-" + source)
+tableDescriptor = new RocksDbTableDescriptor("InputTable-" + name)
 .withSerde(KVSerde.of(
 new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
 new JsonSerdeV2<>(SamzaSqlRelMessage.class)));
@@ -108,7 +111,7 @@ public class ConfigBasedIOResolverFactory implements 
SqlIOResolverFactory {
 
 @Override
 public SqlIOConfig fetchSinkInfo(String sink) {
-  throw new NotImplementedException("No sink support in 
ConfigBasedIOResolver.");
+  return fetchSystemInfo(sink);
 }
 
 private boolean isTable(String[] sourceComponents) {

http://git-wip-us.apache.org/repos/asf/samza/blob/171793b6/samza-tools/scripts/eh-consumer.sh
--
diff --git a/samza-tools/scripts/eh-consumer.sh 
b/samza-tools/scripts/eh-consumer.sh
index 363e028..382e05a 100755
--- a/samza-tools/scripts/eh-consumer.sh
+++ b/samza-tools/scripts/eh-consumer.sh
@@ -20,7 +20,7 @@ if [ `uname` == 'Linux' ];
 then
   base_dir=$(readlink -f $(dirname $0))
 else
-  base_dir=$(realpath $(dirname $0))
+  base_dir=$(dirname $0)
 fi
 
 if [ "x$LOG4J_OPTS" = "x" ]; then

http://git-wip-us.apache.org/repos/asf/samza/blob/171793b6/samza-tools/scripts/generate-kafka-events.sh

[jira] [Commented] (SAMZA-1720) Remove javafx.util dependency from samza-sql tests.

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/SAMZA-1720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16481091#comment-16481091
 ] 

ASF GitHub Bot commented on SAMZA-1720:
---

Github user asfgit closed the pull request at:

https://github.com/apache/samza/pull/527


> Remove javafx.util dependency from samza-sql tests.
> ---
>
> Key: SAMZA-1720
> URL: https://issues.apache.org/jira/browse/SAMZA-1720
> Project: Samza
>  Issue Type: Bug
>Reporter: Shanthoosh Venkataraman
>Priority: Major
>
> In samza-sql module, currently few test classes(TestSamzaSqlRelMessageSerde 
> and TestSamzaSqlRelRecordSerde) are dependent upon javax.util.Pair class.
> javafx.util.Pair is not supported by default in all JDK builds(example; 
> open-jdk java-8 doesn't support javafx module) and it belongs to javafx 
> package which is primarily used for developing GUI applications. 
> This dependency has to be removed and should be replaced with either internal 
> samza classes(KV, Entry) or Pair from apache-commons.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


samza git commit: SAMZA-1720: Remove javafx.util dependency from samza-sql tests.

2018-05-18 Thread xinyu
Repository: samza
Updated Branches:
  refs/heads/master e44b333f1 -> 3b9e14be3


SAMZA-1720: Remove javafx.util dependency from samza-sql tests.

In samza-sql module, currently few test classes(`TestSamzaSqlRelMessageSerde` 
and `TestSamzaSqlRelRecordSerde`) are dependent upon `javafx.util.Pair` 
class(coming from `javafx` module). `javafx.util.Pair` is not supported by 
default in all JDK builds(example; open-jdk java-8 doesn't support `javafx` 
module) and it belongs to `javafx` package which is primarily used for 
developing GUI applications. This dependency is removed and replaced with 
`Pair` class from `apache-commons`.

Author: Shanthoosh Venkataraman 

Reviewers: Jagadish V 

Closes #527 from shanthoosh/SAMZA-1720


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3b9e14be
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3b9e14be
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3b9e14be

Branch: refs/heads/master
Commit: 3b9e14be3aa5fc2b6003b7e244fba466db34236f
Parents: e44b333
Author: Shanthoosh Venkataraman 
Authored: Fri May 18 12:29:24 2018 -0700
Committer: xiliu 
Committed: Fri May 18 12:29:24 2018 -0700

--
 .../java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java   | 4 ++--
 .../java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java| 3 +--
 2 files changed, 3 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/3b9e14be/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
--
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
index 381a3cb..14ca3f0 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
@@ -23,10 +23,10 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import javafx.util.Pair;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.operators.KV;
 import org.apache.samza.sql.avro.AvroRelConverter;
@@ -96,7 +96,7 @@ public class TestSamzaSqlRelMessageSerde {
 GenericData.Record streetNumRecord = new 
GenericData.Record(StreetNumRecord.SCHEMA$);
 streetNumRecord.put("number", 1200);
 addressRecord.put("streetnum", streetNumRecord);
-return new Pair<>(nestedRecordAvroRelConverter.convertToRelMessage(new 
KV<>("key", record)), record);
+return Pair.of(nestedRecordAvroRelConverter.convertToRelMessage(new 
KV<>("key", record)), record);
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/3b9e14be/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java
--
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java
index 95b4972..25d1c77 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java
@@ -26,6 +26,7 @@ import java.util.Map;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.sql.avro.AvroRelConverter;
 import org.apache.samza.sql.avro.AvroRelSchemaProvider;
@@ -37,8 +38,6 @@ import org.apache.samza.system.SystemStream;
 import org.junit.Assert;
 import org.junit.Test;
 
-import javafx.util.Pair;
-
 import static 
org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde;
 import static org.apache.samza.sql.data.SamzaSqlRelMessage.SamzaSqlRelRecord;
 



samza git commit: SAMZA-1720: Remove javafx.util dependency from samza-sql tests.

2018-05-18 Thread xinyu
Repository: samza
Updated Branches:
  refs/heads/0.14.1 973e0735e -> 69e63d815


SAMZA-1720: Remove javafx.util dependency from samza-sql tests.

In samza-sql module, currently few test classes(`TestSamzaSqlRelMessageSerde` 
and `TestSamzaSqlRelRecordSerde`) are dependent upon `javafx.util.Pair` 
class(coming from `javafx` module). `javafx.util.Pair` is not supported by 
default in all JDK builds(example; open-jdk java-8 doesn't support `javafx` 
module) and it belongs to `javafx` package which is primarily used for 
developing GUI applications. This dependency is removed and replaced with 
`Pair` class from `apache-commons`.

Author: Shanthoosh Venkataraman 

Reviewers: Jagadish V 

Closes #527 from shanthoosh/SAMZA-1720

(cherry picked from commit 3b9e14be3aa5fc2b6003b7e244fba466db34236f)
Signed-off-by: xiliu 


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/69e63d81
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/69e63d81
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/69e63d81

Branch: refs/heads/0.14.1
Commit: 69e63d8153b89cf47c618b3f4e871609d7b18a02
Parents: 973e073
Author: Shanthoosh Venkataraman 
Authored: Fri May 18 12:29:24 2018 -0700
Committer: xiliu 
Committed: Fri May 18 12:29:36 2018 -0700

--
 .../java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java   | 4 ++--
 .../java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java| 3 +--
 2 files changed, 3 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/69e63d81/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
--
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
index 381a3cb..14ca3f0 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java
@@ -23,10 +23,10 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import javafx.util.Pair;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.operators.KV;
 import org.apache.samza.sql.avro.AvroRelConverter;
@@ -96,7 +96,7 @@ public class TestSamzaSqlRelMessageSerde {
 GenericData.Record streetNumRecord = new 
GenericData.Record(StreetNumRecord.SCHEMA$);
 streetNumRecord.put("number", 1200);
 addressRecord.put("streetnum", streetNumRecord);
-return new Pair<>(nestedRecordAvroRelConverter.convertToRelMessage(new 
KV<>("key", record)), record);
+return Pair.of(nestedRecordAvroRelConverter.convertToRelMessage(new 
KV<>("key", record)), record);
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/69e63d81/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java
--
diff --git 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java
index 95b4972..25d1c77 100644
--- 
a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java
+++ 
b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java
@@ -26,6 +26,7 @@ import java.util.Map;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.sql.avro.AvroRelConverter;
 import org.apache.samza.sql.avro.AvroRelSchemaProvider;
@@ -37,8 +38,6 @@ import org.apache.samza.system.SystemStream;
 import org.junit.Assert;
 import org.junit.Test;
 
-import javafx.util.Pair;
-
 import static 
org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde;
 import static org.apache.samza.sql.data.SamzaSqlRelMessage.SamzaSqlRelRecord;
 



samza git commit: SAMZA-1715: Unit test for Kafka admin deletedMessagesCalled() fail

2018-05-18 Thread xinyu
Repository: samza
Updated Branches:
  refs/heads/0.14.1 b2ca67ea1 -> 973e0735e


SAMZA-1715: Unit test for Kafka admin deletedMessagesCalled() fail

Test locally and works.

Author: Yi Pan (Data Infrastructure) 

Reviewers: Jagadish 

Closes #523 from nickpan47/fix-unittest-deleted-messages


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/973e0735
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/973e0735
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/973e0735

Branch: refs/heads/0.14.1
Commit: 973e0735e076101fa9c05e88df999cfdd289fd0a
Parents: b2ca67e
Author: Yi Pan (Data Infrastructure) 
Authored: Wed May 16 22:19:37 2018 -0700
Committer: xiliu 
Committed: Fri May 18 12:27:46 2018 -0700

--
 .../scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala| 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/973e0735/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
--
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index c76f6e5..a63db03 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -583,8 +583,6 @@ class KafkaSystemAdmin(
 * This only works with Kafka cluster 0.11 or later. Otherwise it's a no-op.
 */
   override def deleteMessages(offsets: util.Map[SystemStreamPartition, 
String]) {
-deleteMessagesCalled = true
-
 if (!running) {
   throw new SamzaException(s"KafkaSystemAdmin has not started yet for 
system $systemName")
 }
@@ -593,6 +591,7 @@ class KafkaSystemAdmin(
 (new TopicPartition(systemStreamPartition.getStream, 
systemStreamPartition.getPartition.getPartitionId), offset.toLong + 1)
   }.toMap
   adminClient.deleteRecordsBefore(nextOffsets)
+  deleteMessagesCalled = true
 }
   }