[jira] [Commented] (SAMZA-1712) Tear down of connections in ZkClient on interrupts.
[ https://issues.apache.org/jira/browse/SAMZA-1712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16481381#comment-16481381 ] ASF GitHub Bot commented on SAMZA-1712: --- Github user asfgit closed the pull request at: https://github.com/apache/samza/pull/519 > Tear down of connections in ZkClient on interrupts. > --- > > Key: SAMZA-1712 > URL: https://issues.apache.org/jira/browse/SAMZA-1712 > Project: Samza > Issue Type: Bug >Reporter: Shanthoosh Venkataraman >Assignee: Shanthoosh Venkataraman >Priority: Major > > If a thread executing zkClient.close is interrupted, currently we swallow the > ZkInterruptedException and proceed without closing the zookeeper connection. > This leads to ephemeral nodes of StreamProcessor lurking around in zookeeper > after StreamProcessor shutdown. > Users had to wait till zookeeper server session timeout for the ephemeral > nodes to get deleted. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
samza git commit: SAMZA-1712: Tear down ZookeeperServer connections in ZkClient on interrupts.
Repository: samza Updated Branches: refs/heads/master 7a2b4650c -> 72ad7523f SAMZA-1712: Tear down ZookeeperServer connections in ZkClient on interrupts. **Problem:** If a thread executing zkClient.close is interrupted, currently we swallow the ZkInterruptedException and proceed without closing the zookeeper connection. This leads to ephemeral nodes of StreamProcessor lurking around in zookeeper after StreamProcessor shutdown. Users had to wait till zookeeper server session timeout for the ephemeral nodes to get deleted. **Change:** Retry once on InterruptedException when closing the zkClient. Misc changes: * Remove unnecessary null checks. * Remove unnecessary typecasts. Author: Shanthoosh VenkataramanReviewers: Jagadish Closes #519 from shanthoosh/handle_interrupted_exception_in_zkclient_close Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/72ad7523 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/72ad7523 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/72ad7523 Branch: refs/heads/master Commit: 72ad7523fffdcafdc01a0c6922fc94ccd1e482a5 Parents: 7a2b465 Author: Shanthoosh Venkataraman Authored: Fri May 18 17:29:00 2018 -0700 Committer: Jagadish Committed: Fri May 18 17:29:00 2018 -0700 -- .../main/java/org/apache/samza/zk/ZkUtils.java | 42 ++ .../org/apache/samza/zk/ZkUtilsMetrics.java | 6 ++ .../java/org/apache/samza/zk/TestZkUtils.java | 59 +--- 3 files changed, 73 insertions(+), 34 deletions(-) -- http://git-wip-us.apache.org/repos/asf/samza/blob/72ad7523/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java -- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index 43f7d9c..6511603 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -78,7 +78,6 @@ public class ZkUtils { private static final Logger LOG = LoggerFactory.getLogger(ZkUtils.class); /* package private */static final String ZK_PROTOCOL_VERSION = "1.0"; - private final ZkClient zkClient; private volatile String ephemeralPath = null; private final ZkKeyBuilder keyBuilder; @@ -105,9 +104,7 @@ public class ZkUtils { public void connect() throws ZkInterruptedException { boolean isConnected = zkClient.waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS); if (!isConnected) { - if (metrics != null) { -metrics.zkConnectionError.inc(); - } + metrics.zkConnectionError.inc(); throw new RuntimeException("Unable to connect to Zookeeper within connectionTimeout " + connectionTimeoutMs + "ms. Shutting down!"); } } @@ -144,6 +141,7 @@ public class ZkUtils { if (!isValidRegisteredProcessor(processorNode)) { LOG.info("Processor: {} is duplicate. Deleting zookeeper node at path: {}.", processorId, ephemeralPath); zkClient.delete(ephemeralPath); +metrics.deletions.inc(); throw new SamzaException(String.format("Processor: %s is duplicate in the group. Registration failed.", processorId)); } } else { @@ -272,16 +270,12 @@ public class ZkUtils { public void subscribeDataChanges(String path, IZkDataListener dataListener) { zkClient.subscribeDataChanges(path, dataListener); -if (metrics != null) { - metrics.subscriptions.inc(); -} +metrics.subscriptions.inc(); } public void subscribeChildChanges(String path, IZkChildListener listener) { zkClient.subscribeChildChanges(path, listener); -if (metrics != null) { - metrics.subscriptions.inc(); -} +metrics.subscriptions.inc(); } public void unsubscribeChildChanges(String path, IZkChildListener childListener) { @@ -290,9 +284,7 @@ public class ZkUtils { public void writeData(String path, Object object) { zkClient.writeData(path, object); -if (metrics != null) { - metrics.writes.inc(); -} +metrics.writes.inc(); } public boolean exists(String path) { @@ -303,9 +295,10 @@ public class ZkUtils { try { zkClient.close(); } catch (ZkInterruptedException e) { - // Swallowing due to occurrence in the last stage of lifecycle (Not actionable) and clear the interrupted status. + LOG.warn("Interrupted when closing zkClient. Clearing the interrupted status and retrying.", e); Thread.interrupted(); - LOG.warn("Ignoring the exception when closing the zookeeper client.", e); + zkClient.close(); + Thread.currentThread().interrupt();
[jira] [Resolved] (SAMZA-1508) JobRunner should not return success until the job is healthy
[ https://issues.apache.org/jira/browse/SAMZA-1508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jake Maes resolved SAMZA-1508. -- Resolution: Fixed > JobRunner should not return success until the job is healthy > > > Key: SAMZA-1508 > URL: https://issues.apache.org/jira/browse/SAMZA-1508 > Project: Samza > Issue Type: Bug >Reporter: Jake Maes >Assignee: Jake Maes >Priority: Major > Fix For: 0.15.0 > > > It can be frustrating for users when run-app.sh returns success before the > job was fully running. > This happens because the JobRunner currently waits for JobStatus=RUNNING, but > in Yarn for example, that happens when the AM is launched, not when all the > containers are launched. > What can go wrong? > 1. The job could stay stuck waiting for containers that it cant get because > of capacity issues or an outage. > 2. The job containers may immediately fail due to a runtime error. > In both cases, the user may go on their merry way because run-app.sh returned > successfully, even though the job is already dead. They may not get alerted > for some time. > How do we fix? > There are a few ways to fix it. Each one progressively harder but > progressively better: > 1. Make JobRunner reach out to AM and monitor the needed containers metric > until it reaches 0 > 2. Expose a new healthy endpoint in the AM which is only set to true when a > heartbeat has been received from each of the containers. Have the JobRunner > wait on this (with a timeout) > 3. Expose a hook where users can write custom logic to determine job health > I think #1 is the most bang for buck and the implementation for #1 can easily > be extended for #2 later. > Other notes: > I don't think this is needed for standalone, since users are directly > deploying the processors and can monitor the processes directly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (SAMZA-1508) JobRunner should not return success until the job is healthy
[ https://issues.apache.org/jira/browse/SAMZA-1508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16481213#comment-16481213 ] ASF GitHub Bot commented on SAMZA-1508: --- Github user asfgit closed the pull request at: https://github.com/apache/samza/pull/367 > JobRunner should not return success until the job is healthy > > > Key: SAMZA-1508 > URL: https://issues.apache.org/jira/browse/SAMZA-1508 > Project: Samza > Issue Type: Bug >Reporter: Jake Maes >Assignee: Jake Maes >Priority: Major > Fix For: 0.15.0 > > > It can be frustrating for users when run-app.sh returns success before the > job was fully running. > This happens because the JobRunner currently waits for JobStatus=RUNNING, but > in Yarn for example, that happens when the AM is launched, not when all the > containers are launched. > What can go wrong? > 1. The job could stay stuck waiting for containers that it cant get because > of capacity issues or an outage. > 2. The job containers may immediately fail due to a runtime error. > In both cases, the user may go on their merry way because run-app.sh returned > successfully, even though the job is already dead. They may not get alerted > for some time. > How do we fix? > There are a few ways to fix it. Each one progressively harder but > progressively better: > 1. Make JobRunner reach out to AM and monitor the needed containers metric > until it reaches 0 > 2. Expose a new healthy endpoint in the AM which is only set to true when a > heartbeat has been received from each of the containers. Have the JobRunner > wait on this (with a timeout) > 3. Expose a hook where users can write custom logic to determine job health > I think #1 is the most bang for buck and the implementation for #1 can easily > be extended for #2 later. > Other notes: > I don't think this is needed for standalone, since users are directly > deploying the processors and can monitor the processes directly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
samza git commit: SAMZA-1508: JobRunner should not return success until the job is healthy
Repository: samza Updated Branches: refs/heads/master 171793b69 -> 7a2b4650c SAMZA-1508: JobRunner should not return success until the job is healthy Author: Jacob MaesAuthor: Jacob Maes Reviewers: Prateek Maheshwari Closes #367 from jmakes/samza-1508 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7a2b4650 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7a2b4650 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7a2b4650 Branch: refs/heads/master Commit: 7a2b4650cd7c87f0475bc06d306cd4ef834377b0 Parents: 171793b Author: Jacob Maes Authored: Fri May 18 14:23:07 2018 -0700 Committer: Jacob Maes <--global> Committed: Fri May 18 14:23:07 2018 -0700 -- build.gradle| 1 + .../scala/org/apache/samza/job/JobRunner.scala | 38 +-- .../webapp/ApplicationMasterRestClient.java | 111 +++ .../apache/samza/job/yarn/ClientHelper.scala| 54 ++- .../org/apache/samza/job/yarn/YarnJob.scala | 10 +- .../webapp/ApplicationMasterRestServlet.scala | 76 +++-- .../webapp/TestApplicationMasterRestClient.java | 330 +++ .../samza/job/yarn/TestClientHelper.scala | 36 +- 8 files changed, 583 insertions(+), 73 deletions(-) -- http://git-wip-us.apache.org/repos/asf/samza/blob/7a2b4650/build.gradle -- diff --git a/build.gradle b/build.gradle index 2f27a03..0b4dae5 100644 --- a/build.gradle +++ b/build.gradle @@ -448,6 +448,7 @@ project(":samza-yarn_$scalaVersion") { compile "org.scala-lang:scala-compiler:$scalaLibVersion" compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion" compile "commons-httpclient:commons-httpclient:$commonsHttpClientVersion" +compile "org.apache.httpcomponents:httpclient:$httpClientVersion" compile("org.apache.hadoop:hadoop-yarn-api:$yarnVersion") { exclude module: 'slf4j-log4j12' } http://git-wip-us.apache.org/repos/asf/samza/blob/7a2b4650/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala -- diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala index 917c018..c6e14f2 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala @@ -20,6 +20,8 @@ package org.apache.samza.job +import java.util.concurrent.TimeUnit + import org.apache.samza.SamzaException import org.apache.samza.config.Config import org.apache.samza.config.JobConfig.Config2Job @@ -117,23 +119,11 @@ class JobRunner(config: Config) extends Logging { coordinatorSystemProducer.stop() // Create the actual job, and submit it. -val job = jobFactory.getJob(config).submit - -info("waiting for job to start") - -// Wait until the job has started, then exit. -Option(job.waitForStatus(Running, 500)) match { - case Some(appStatus) => { -if (Running.equals(appStatus)) { - info("job started successfully - " + appStatus) -} else { - warn("unable to start job successfully. job has status %s" format (appStatus)) -} - } - case _ => warn("unable to start job successfully.") -} +val job = jobFactory.getJob(config) + +job.submit() -info("exiting") +info("Job submitted. Check status to determine when it is running.") job } @@ -143,21 +133,7 @@ class JobRunner(config: Config) extends Logging { // Create the actual job, and kill it. val job = jobFactory.getJob(config).kill() -info("waiting for job to terminate") - -// Wait until the job has terminated, then exit. -Option(job.waitForFinish(5000)) match { - case Some(appStatus) => { -if (SuccessfulFinish.equals(appStatus)) { - info("job terminated successfully - " + appStatus) -} else { - warn("unable to terminate job successfully. job has status %s" format (appStatus)) -} - } - case _ => warn("unable to terminate job successfully.") -} - -info("exiting") +info("Kill command executed. Check status to determine when it is terminated.") } def status(): ApplicationStatus = { http://git-wip-us.apache.org/repos/asf/samza/blob/7a2b4650/samza-yarn/src/main/java/org/apache/samza/webapp/ApplicationMasterRestClient.java -- diff --git a/samza-yarn/src/main/java/org/apache/samza/webapp/ApplicationMasterRestClient.java
[samza] Git Push Summary
Repository: samza Updated Tags: refs/tags/release-0.14.1-rc2 [created] c84935cac
samza git commit: Implementing the fetchSinkInfo in ConfigBasedIOResolver
Repository: samza Updated Branches: refs/heads/0.14.1 69e63d815 -> 9bc03f7ab Implementing the fetchSinkInfo in ConfigBasedIOResolver 1. I think we missed implementing the fetchSinkInfo method in the ConfigBasedResolver when the API was introduced which is breaking the samza sql console tool. This fixes it. 2. latest release of mac removed realpath so the command line tools are broken. Removed the usage of realpath to fix these tools. Thanks to nickpan47 for identifying these problems. Author: Srinivasulu PunuruReviewers: Yi Pan Closes #528 from srinipunuru/release-fix.1 (cherry picked from commit 171793b69b33081fc6277c9505b3055f79fcb4b7) Signed-off-by: xiliu Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9bc03f7a Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9bc03f7a Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9bc03f7a Branch: refs/heads/0.14.1 Commit: 9bc03f7ab32199a15af157cc515d897917694cb5 Parents: 69e63d8 Author: Srinivasulu Punuru Authored: Fri May 18 12:31:44 2018 -0700 Committer: xiliu Committed: Fri May 18 12:31:57 2018 -0700 -- .../samza/sql/impl/ConfigBasedIOResolverFactory.java | 13 - samza-tools/scripts/eh-consumer.sh | 2 +- samza-tools/scripts/generate-kafka-events.sh | 2 +- samza-tools/scripts/samza-sql-console.sh | 2 +- 4 files changed, 11 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/samza/blob/9bc03f7a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java -- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java index 0887dc4..c604e71 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java @@ -19,7 +19,6 @@ package org.apache.samza.sql.impl; -import org.apache.commons.lang.NotImplementedException; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.operators.TableDescriptor; @@ -62,7 +61,11 @@ public class ConfigBasedIOResolverFactory implements SqlIOResolverFactory { @Override public SqlIOConfig fetchSourceInfo(String source) { - String[] sourceComponents = source.split("\\."); + return fetchSystemInfo(source); +} + +private SqlIOConfig fetchSystemInfo(String name) { + String[] sourceComponents = name.split("\\."); boolean isTable = isTable(sourceComponents); // This source resolver expects sources of format {systemName}.{streamName}[.$table] @@ -86,7 +89,7 @@ public class ConfigBasedIOResolverFactory implements SqlIOResolverFactory { } if (invalidQuery) { -String msg = String.format("Source %s is not of the format {systemName}.{streamName}[.%s]", source, +String msg = String.format("Source %s is not of the format {systemName}.{streamName}[.%s]", name, SAMZA_SQL_QUERY_TABLE_KEYWORD); LOG.error(msg); throw new SamzaException(msg); @@ -97,7 +100,7 @@ public class ConfigBasedIOResolverFactory implements SqlIOResolverFactory { TableDescriptor tableDescriptor = null; if (isTable) { -tableDescriptor = new RocksDbTableDescriptor("InputTable-" + source) +tableDescriptor = new RocksDbTableDescriptor("InputTable-" + name) .withSerde(KVSerde.of( new JsonSerdeV2<>(SamzaSqlCompositeKey.class), new JsonSerdeV2<>(SamzaSqlRelMessage.class))); @@ -108,7 +111,7 @@ public class ConfigBasedIOResolverFactory implements SqlIOResolverFactory { @Override public SqlIOConfig fetchSinkInfo(String sink) { - throw new NotImplementedException("No sink support in ConfigBasedIOResolver."); + return fetchSystemInfo(sink); } private boolean isTable(String[] sourceComponents) { http://git-wip-us.apache.org/repos/asf/samza/blob/9bc03f7a/samza-tools/scripts/eh-consumer.sh -- diff --git a/samza-tools/scripts/eh-consumer.sh b/samza-tools/scripts/eh-consumer.sh index 363e028..382e05a 100755 --- a/samza-tools/scripts/eh-consumer.sh +++ b/samza-tools/scripts/eh-consumer.sh @@ -20,7 +20,7 @@ if [ `uname` == 'Linux' ]; then base_dir=$(readlink -f $(dirname $0)) else - base_dir=$(realpath $(dirname $0)) + base_dir=$(dirname $0) fi if [ "x$LOG4J_OPTS" = "x" ]; then
samza git commit: Implementing the fetchSinkInfo in ConfigBasedIOResolver
Repository: samza Updated Branches: refs/heads/master 3b9e14be3 -> 171793b69 Implementing the fetchSinkInfo in ConfigBasedIOResolver 1. I think we missed implementing the fetchSinkInfo method in the ConfigBasedResolver when the API was introduced which is breaking the samza sql console tool. This fixes it. 2. latest release of mac removed realpath so the command line tools are broken. Removed the usage of realpath to fix these tools. Thanks to nickpan47 for identifying these problems. Author: Srinivasulu PunuruReviewers: Yi Pan Closes #528 from srinipunuru/release-fix.1 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/171793b6 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/171793b6 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/171793b6 Branch: refs/heads/master Commit: 171793b69b33081fc6277c9505b3055f79fcb4b7 Parents: 3b9e14b Author: Srinivasulu Punuru Authored: Fri May 18 12:31:44 2018 -0700 Committer: xiliu Committed: Fri May 18 12:31:44 2018 -0700 -- .../samza/sql/impl/ConfigBasedIOResolverFactory.java | 13 - samza-tools/scripts/eh-consumer.sh | 2 +- samza-tools/scripts/generate-kafka-events.sh | 2 +- samza-tools/scripts/samza-sql-console.sh | 2 +- 4 files changed, 11 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/samza/blob/171793b6/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java -- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java index 0887dc4..c604e71 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java @@ -19,7 +19,6 @@ package org.apache.samza.sql.impl; -import org.apache.commons.lang.NotImplementedException; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.operators.TableDescriptor; @@ -62,7 +61,11 @@ public class ConfigBasedIOResolverFactory implements SqlIOResolverFactory { @Override public SqlIOConfig fetchSourceInfo(String source) { - String[] sourceComponents = source.split("\\."); + return fetchSystemInfo(source); +} + +private SqlIOConfig fetchSystemInfo(String name) { + String[] sourceComponents = name.split("\\."); boolean isTable = isTable(sourceComponents); // This source resolver expects sources of format {systemName}.{streamName}[.$table] @@ -86,7 +89,7 @@ public class ConfigBasedIOResolverFactory implements SqlIOResolverFactory { } if (invalidQuery) { -String msg = String.format("Source %s is not of the format {systemName}.{streamName}[.%s]", source, +String msg = String.format("Source %s is not of the format {systemName}.{streamName}[.%s]", name, SAMZA_SQL_QUERY_TABLE_KEYWORD); LOG.error(msg); throw new SamzaException(msg); @@ -97,7 +100,7 @@ public class ConfigBasedIOResolverFactory implements SqlIOResolverFactory { TableDescriptor tableDescriptor = null; if (isTable) { -tableDescriptor = new RocksDbTableDescriptor("InputTable-" + source) +tableDescriptor = new RocksDbTableDescriptor("InputTable-" + name) .withSerde(KVSerde.of( new JsonSerdeV2<>(SamzaSqlCompositeKey.class), new JsonSerdeV2<>(SamzaSqlRelMessage.class))); @@ -108,7 +111,7 @@ public class ConfigBasedIOResolverFactory implements SqlIOResolverFactory { @Override public SqlIOConfig fetchSinkInfo(String sink) { - throw new NotImplementedException("No sink support in ConfigBasedIOResolver."); + return fetchSystemInfo(sink); } private boolean isTable(String[] sourceComponents) { http://git-wip-us.apache.org/repos/asf/samza/blob/171793b6/samza-tools/scripts/eh-consumer.sh -- diff --git a/samza-tools/scripts/eh-consumer.sh b/samza-tools/scripts/eh-consumer.sh index 363e028..382e05a 100755 --- a/samza-tools/scripts/eh-consumer.sh +++ b/samza-tools/scripts/eh-consumer.sh @@ -20,7 +20,7 @@ if [ `uname` == 'Linux' ]; then base_dir=$(readlink -f $(dirname $0)) else - base_dir=$(realpath $(dirname $0)) + base_dir=$(dirname $0) fi if [ "x$LOG4J_OPTS" = "x" ]; then http://git-wip-us.apache.org/repos/asf/samza/blob/171793b6/samza-tools/scripts/generate-kafka-events.sh
[jira] [Commented] (SAMZA-1720) Remove javafx.util dependency from samza-sql tests.
[ https://issues.apache.org/jira/browse/SAMZA-1720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16481091#comment-16481091 ] ASF GitHub Bot commented on SAMZA-1720: --- Github user asfgit closed the pull request at: https://github.com/apache/samza/pull/527 > Remove javafx.util dependency from samza-sql tests. > --- > > Key: SAMZA-1720 > URL: https://issues.apache.org/jira/browse/SAMZA-1720 > Project: Samza > Issue Type: Bug >Reporter: Shanthoosh Venkataraman >Priority: Major > > In samza-sql module, currently few test classes(TestSamzaSqlRelMessageSerde > and TestSamzaSqlRelRecordSerde) are dependent upon javax.util.Pair class. > javafx.util.Pair is not supported by default in all JDK builds(example; > open-jdk java-8 doesn't support javafx module) and it belongs to javafx > package which is primarily used for developing GUI applications. > This dependency has to be removed and should be replaced with either internal > samza classes(KV, Entry) or Pair from apache-commons. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
samza git commit: SAMZA-1720: Remove javafx.util dependency from samza-sql tests.
Repository: samza Updated Branches: refs/heads/master e44b333f1 -> 3b9e14be3 SAMZA-1720: Remove javafx.util dependency from samza-sql tests. In samza-sql module, currently few test classes(`TestSamzaSqlRelMessageSerde` and `TestSamzaSqlRelRecordSerde`) are dependent upon `javafx.util.Pair` class(coming from `javafx` module). `javafx.util.Pair` is not supported by default in all JDK builds(example; open-jdk java-8 doesn't support `javafx` module) and it belongs to `javafx` package which is primarily used for developing GUI applications. This dependency is removed and replaced with `Pair` class from `apache-commons`. Author: Shanthoosh VenkataramanReviewers: Jagadish V Closes #527 from shanthoosh/SAMZA-1720 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3b9e14be Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3b9e14be Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3b9e14be Branch: refs/heads/master Commit: 3b9e14be3aa5fc2b6003b7e244fba466db34236f Parents: e44b333 Author: Shanthoosh Venkataraman Authored: Fri May 18 12:29:24 2018 -0700 Committer: xiliu Committed: Fri May 18 12:29:24 2018 -0700 -- .../java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java | 4 ++-- .../java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java| 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/samza/blob/3b9e14be/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java -- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java index 381a3cb..14ca3f0 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java @@ -23,10 +23,10 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import javafx.util.Pair; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.tuple.Pair; import org.apache.samza.config.MapConfig; import org.apache.samza.operators.KV; import org.apache.samza.sql.avro.AvroRelConverter; @@ -96,7 +96,7 @@ public class TestSamzaSqlRelMessageSerde { GenericData.Record streetNumRecord = new GenericData.Record(StreetNumRecord.SCHEMA$); streetNumRecord.put("number", 1200); addressRecord.put("streetnum", streetNumRecord); -return new Pair<>(nestedRecordAvroRelConverter.convertToRelMessage(new KV<>("key", record)), record); +return Pair.of(nestedRecordAvroRelConverter.convertToRelMessage(new KV<>("key", record)), record); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/3b9e14be/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java -- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java index 95b4972..25d1c77 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java @@ -26,6 +26,7 @@ import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; +import org.apache.commons.lang3.tuple.Pair; import org.apache.samza.config.MapConfig; import org.apache.samza.sql.avro.AvroRelConverter; import org.apache.samza.sql.avro.AvroRelSchemaProvider; @@ -37,8 +38,6 @@ import org.apache.samza.system.SystemStream; import org.junit.Assert; import org.junit.Test; -import javafx.util.Pair; - import static org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde; import static org.apache.samza.sql.data.SamzaSqlRelMessage.SamzaSqlRelRecord;
samza git commit: SAMZA-1720: Remove javafx.util dependency from samza-sql tests.
Repository: samza Updated Branches: refs/heads/0.14.1 973e0735e -> 69e63d815 SAMZA-1720: Remove javafx.util dependency from samza-sql tests. In samza-sql module, currently few test classes(`TestSamzaSqlRelMessageSerde` and `TestSamzaSqlRelRecordSerde`) are dependent upon `javafx.util.Pair` class(coming from `javafx` module). `javafx.util.Pair` is not supported by default in all JDK builds(example; open-jdk java-8 doesn't support `javafx` module) and it belongs to `javafx` package which is primarily used for developing GUI applications. This dependency is removed and replaced with `Pair` class from `apache-commons`. Author: Shanthoosh VenkataramanReviewers: Jagadish V Closes #527 from shanthoosh/SAMZA-1720 (cherry picked from commit 3b9e14be3aa5fc2b6003b7e244fba466db34236f) Signed-off-by: xiliu Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/69e63d81 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/69e63d81 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/69e63d81 Branch: refs/heads/0.14.1 Commit: 69e63d8153b89cf47c618b3f4e871609d7b18a02 Parents: 973e073 Author: Shanthoosh Venkataraman Authored: Fri May 18 12:29:24 2018 -0700 Committer: xiliu Committed: Fri May 18 12:29:36 2018 -0700 -- .../java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java | 4 ++-- .../java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java| 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/samza/blob/69e63d81/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java -- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java index 381a3cb..14ca3f0 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java @@ -23,10 +23,10 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import javafx.util.Pair; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.tuple.Pair; import org.apache.samza.config.MapConfig; import org.apache.samza.operators.KV; import org.apache.samza.sql.avro.AvroRelConverter; @@ -96,7 +96,7 @@ public class TestSamzaSqlRelMessageSerde { GenericData.Record streetNumRecord = new GenericData.Record(StreetNumRecord.SCHEMA$); streetNumRecord.put("number", 1200); addressRecord.put("streetnum", streetNumRecord); -return new Pair<>(nestedRecordAvroRelConverter.convertToRelMessage(new KV<>("key", record)), record); +return Pair.of(nestedRecordAvroRelConverter.convertToRelMessage(new KV<>("key", record)), record); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/69e63d81/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java -- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java index 95b4972..25d1c77 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java @@ -26,6 +26,7 @@ import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; +import org.apache.commons.lang3.tuple.Pair; import org.apache.samza.config.MapConfig; import org.apache.samza.sql.avro.AvroRelConverter; import org.apache.samza.sql.avro.AvroRelSchemaProvider; @@ -37,8 +38,6 @@ import org.apache.samza.system.SystemStream; import org.junit.Assert; import org.junit.Test; -import javafx.util.Pair; - import static org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde; import static org.apache.samza.sql.data.SamzaSqlRelMessage.SamzaSqlRelRecord;
samza git commit: SAMZA-1715: Unit test for Kafka admin deletedMessagesCalled() fail
Repository: samza Updated Branches: refs/heads/0.14.1 b2ca67ea1 -> 973e0735e SAMZA-1715: Unit test for Kafka admin deletedMessagesCalled() fail Test locally and works. Author: Yi Pan (Data Infrastructure)Reviewers: Jagadish Closes #523 from nickpan47/fix-unittest-deleted-messages Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/973e0735 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/973e0735 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/973e0735 Branch: refs/heads/0.14.1 Commit: 973e0735e076101fa9c05e88df999cfdd289fd0a Parents: b2ca67e Author: Yi Pan (Data Infrastructure) Authored: Wed May 16 22:19:37 2018 -0700 Committer: xiliu Committed: Fri May 18 12:27:46 2018 -0700 -- .../scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala| 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/samza/blob/973e0735/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala -- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index c76f6e5..a63db03 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -583,8 +583,6 @@ class KafkaSystemAdmin( * This only works with Kafka cluster 0.11 or later. Otherwise it's a no-op. */ override def deleteMessages(offsets: util.Map[SystemStreamPartition, String]) { -deleteMessagesCalled = true - if (!running) { throw new SamzaException(s"KafkaSystemAdmin has not started yet for system $systemName") } @@ -593,6 +591,7 @@ class KafkaSystemAdmin( (new TopicPartition(systemStreamPartition.getStream, systemStreamPartition.getPartition.getPartitionId), offset.toLong + 1) }.toMap adminClient.deleteRecordsBefore(nextOffsets) + deleteMessagesCalled = true } }