[jira] [Created] (SAMZA-2627) Make StreamAppender extensible for sending messages to SystemProducer
Lakshmi Manasa Gaduputi created SAMZA-2627: -- Summary: Make StreamAppender extensible for sending messages to SystemProducer Key: SAMZA-2627 URL: https://issues.apache.org/jira/browse/SAMZA-2627 Project: Samza Issue Type: Improvement Reporter: Lakshmi Manasa Gaduputi Assignee: Lakshmi Manasa Gaduputi StreamAppender currently sends to the underlying SystemProducer the logEvents as OutgoingMessageEnvelope(systemStream, keyBytes = containerName, serializedMessage). This corresponds to the OME with both partition key and record key set to keyBytes. This ticket is to provide classes extending StreamAppender with higher control over the partition and record keys in the OME. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (SAMZA-2628) Include the localized resource lib directory in the classpath of AM container
Aditya Toomula created SAMZA-2628: - Summary: Include the localized resource lib directory in the classpath of AM container Key: SAMZA-2628 URL: https://issues.apache.org/jira/browse/SAMZA-2628 Project: Samza Issue Type: Bug Reporter: Aditya Toomula Assignee: Aditya Toomula This change is similar to "SAMZA-2364: Include the localized resource lib directory in the classpath of SamzaContainer" but this one is for AM container. This is required for Samza SQL UDFs when planning is done in AM. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [samza] atoomula opened a new pull request #1469: SAMZA-2628: Include the localized resource lib directory in the classpath of AM container
atoomula opened a new pull request #1469: URL: https://github.com/apache/samza/pull/1469 **Problem**: With Samza 1.5, planning has moved to AM. Planning for Samza Sql apps with custom UDFs requires the UDF lib to be localized and added to the class path of AM container. The latter part is not being done currently resulting in the app to not come up. **Fix**: Add UDF lib to class path of AM container. **API Changes**: None **Tests**: Added unit tests **Upgrade Instructions**: None **Usage Instructions**: None This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [samza] lakshmi-manasa-g opened a new pull request #1470: SAMZA-2627: Make StreamAppender extensible for sending messages to SystemProducer
lakshmi-manasa-g opened a new pull request #1470: URL: https://github.com/apache/samza/pull/1470 Issue: StreamAppender sets both partition key and record key = container name for OutgoingMessageEnvelope sent to underlying SystemProducer. This restricts how the classes extending StreamAppender send to SystemProducer. Change: Mark sendEventToSystemProducer as protected. Tests: existing tests pass, no functionality change. API changes: none Usage, upgrade instructions: none This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [samza] kw2542 commented on pull request #1469: SAMZA-2628: Include the localized resource lib directory in the classpath of AM container
kw2542 commented on pull request #1469: URL: https://github.com/apache/samza/pull/1469#issuecomment-789939379 LGTM, please follow https://cwiki.apache.org/confluence/display/SAMZA/SEP-25%3A+PR+Title+And+Description+Guidelines to standardize the PR description. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [samza] mynameborat commented on a change in pull request #1470: SAMZA-2627: Make StreamAppender extensible for sending messages to SystemProducer
mynameborat commented on a change in pull request #1470: URL: https://github.com/apache/samza/pull/1470#discussion_r586653498 ## File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java ## @@ -439,7 +439,7 @@ private void startTransferThread() { * Helper method to send a serialized log-event to the systemProducer, and increment respective methods. * @param serializedLogEvent */ - private void sendEventToSystemProducer(byte[] serializedLogEvent) { + protected void sendEventToSystemProducer(byte[] serializedLogEvent) { Review comment: Another thought here is to expose the part that needs to be overridden by the extenders so that the core aspects around sending an event to `systemProducer` can remain consistent. It allows consistent set of metrics to be tracked and the evolution around sending events will remain closed and common. Not sure if this is a major concern given the function as is light weight. Just throwing out another option for you to think about. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [samza] mynameborat commented on a change in pull request #1470: SAMZA-2627: Make StreamAppender extensible for sending messages to SystemProducer
mynameborat commented on a change in pull request #1470: URL: https://github.com/apache/samza/pull/1470#discussion_r586653498 ## File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java ## @@ -439,7 +439,7 @@ private void startTransferThread() { * Helper method to send a serialized log-event to the systemProducer, and increment respective methods. * @param serializedLogEvent */ - private void sendEventToSystemProducer(byte[] serializedLogEvent) { + protected void sendEventToSystemProducer(byte[] serializedLogEvent) { Review comment: Another thought here is to expose the part that needs to be overridden by the extenders so that the core aspects around sending an event to `systemProducer` can remain consistent. It allows consistent set of metrics to be tracked and the evolution around sending events will remain closed and common. It isn't a major concern given the function as is light weight. Just throwing out another option for you to think about. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [samza] shanthoosh commented on pull request #1469: SAMZA-2628: Include the localized resource lib directory in the classpath of AM container
shanthoosh commented on pull request #1469: URL: https://github.com/apache/samza/pull/1469#issuecomment-789955368 @atoomula It seems like some junit-tests are failing in the travis-CI(https://travis-ci.org/github/apache/samza/builds/761322186). Are they flaky or caused by the changes in this patch? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [samza] lakshmi-manasa-g commented on a change in pull request #1470: SAMZA-2627: Make StreamAppender extensible for sending messages to SystemProducer
lakshmi-manasa-g commented on a change in pull request #1470: URL: https://github.com/apache/samza/pull/1470#discussion_r586719978 ## File path: samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java ## @@ -439,7 +439,7 @@ private void startTransferThread() { * Helper method to send a serialized log-event to the systemProducer, and increment respective methods. * @param serializedLogEvent */ - private void sendEventToSystemProducer(byte[] serializedLogEvent) { + protected void sendEventToSystemProducer(byte[] serializedLogEvent) { Review comment: yep, i did consider that option but decided against to avoid proliferating StreamAppender with one liner methods. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[samza] branch master updated: SAMZA-2628: Include the localized resource lib directory in the classpath of AM container (#1469)
This is an automated email from the ASF dual-hosted git repository. atoomula pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git The following commit(s) were added to refs/heads/master by this push: new 66b2efd SAMZA-2628: Include the localized resource lib directory in the classpath of AM container (#1469) 66b2efd is described below commit 66b2efd1ee96436dd4f4af798da458bc5928d824 Author: Aditya Toomula AuthorDate: Wed Mar 3 12:29:19 2021 -0800 SAMZA-2628: Include the localized resource lib directory in the classpath of AM container (#1469) * SAMZA-2628: Include the localized resource lib directory in the classpath of AM container * SAMZA-2628: Include the localized resource lib directory in the classpath of AM container * Empty commit to trigger build. --- .../scala/org/apache/samza/job/yarn/YarnJob.scala | 2 ++ .../org/apache/samza/job/yarn/TestYarnJob.java | 34 +++--- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala index ca681f5..7e4565b 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala @@ -194,6 +194,8 @@ object YarnJob extends Logging { Option.apply(yarnConfig.getAMJavaHome).foreach { amJavaHome => envMapBuilder += ShellCommandConfig.ENV_JAVA_HOME -> amJavaHome } +envMapBuilder += ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR -> + Util.envVarEscape(config.get(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR, "")) envMapBuilder.result() } diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java index 4858d76..f068800 100644 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnJob.java @@ -65,7 +65,8 @@ public class TestYarnJob { Map expected = ImmutableMap.of( ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG, expectedCoordinatorStreamConfigStringValue, ShellCommandConfig.ENV_JAVA_OPTS, Util.envVarEscape(amJvmOptions), -ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "false"); +ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "false", +ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, ""); assertEquals(expected, JavaConverters.mapAsJavaMapConverter( YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava()); } @@ -85,7 +86,8 @@ public class TestYarnJob { ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG, expectedCoordinatorStreamConfigStringValue, ShellCommandConfig.ENV_JAVA_OPTS, "", ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "true", -ShellCommandConfig.ENV_APPLICATION_LIB_DIR, "./__package/lib"); +ShellCommandConfig.ENV_APPLICATION_LIB_DIR, "./__package/lib", +ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, ""); assertEquals(expected, JavaConverters.mapAsJavaMapConverter( YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava()); } @@ -106,7 +108,8 @@ public class TestYarnJob { ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG, expectedCoordinatorStreamConfigStringValue, ShellCommandConfig.ENV_JAVA_OPTS, "", ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "false", -ShellCommandConfig.ENV_JAVA_HOME, "/some/path/to/java/home"); +ShellCommandConfig.ENV_JAVA_HOME, "/some/path/to/java/home", +ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, ""); assertEquals(expected, JavaConverters.mapAsJavaMapConverter( YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava()); } @@ -126,7 +129,30 @@ public class TestYarnJob { ShellCommandConfig.ENV_SUBMISSION_CONFIG, expectedSubmissionConfig, ShellCommandConfig.ENV_JAVA_OPTS, "", ShellCommandConfig.ENV_SPLIT_DEPLOYMENT_ENABLED, "true", -ShellCommandConfig.ENV_APPLICATION_LIB_DIR, "./__package/lib"); +ShellCommandConfig.ENV_APPLICATION_LIB_DIR, "./__package/lib", +ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, ""); +assertEquals(expected, JavaConverters.mapAsJavaMapConverter( +YarnJob$.MODULE$.buildEnvironment(config, new YarnConfig(config), new JobConfig(config))).asJava()); + } + + @Test + public void testBuildJobWithAdditionalClassPath() throws IOException { +Config config = new MapConfig(new ImmutableMap.Builder() +.put(JobConfig.JOB_NAME, "jobName") +.put(JobConfig.JOB_ID, "jobId") +.put(JobConfig.CONFIG_LOADER_FACTORY, "org.apache.samza.config.loaders.PropertiesConfig
[GitHub] [samza] atoomula merged pull request #1469: SAMZA-2628: Include the localized resource lib directory in the classpath of AM container
atoomula merged pull request #1469: URL: https://github.com/apache/samza/pull/1469 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[samza] branch master updated: SAMZA-2627: Make StreamAppender extensible for sending messages to SystemProducer (#1470)
This is an automated email from the ASF dual-hosted git repository. bharathkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git The following commit(s) were added to refs/heads/master by this push: new 93c41df SAMZA-2627: Make StreamAppender extensible for sending messages to SystemProducer (#1470) 93c41df is described below commit 93c41dfbc776a1b0b24a947234d6d94981709a0e Author: lakshmi-manasa-g AuthorDate: Wed Mar 3 12:52:05 2021 -0800 SAMZA-2627: Make StreamAppender extensible for sending messages to SystemProducer (#1470) Issue: StreamAppender sets both partition key and record key = container name for OutgoingMessageEnvelope sent to underlying SystemProducer. This restricts how the classes extending StreamAppender send to SystemProducer. Change: Mark sendEventToSystemProducer as protected. --- .../src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java index bfba754..cadf63c 100644 --- a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java +++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java @@ -439,7 +439,7 @@ public class StreamAppender extends AbstractAppender { * Helper method to send a serialized log-event to the systemProducer, and increment respective methods. * @param serializedLogEvent */ - private void sendEventToSystemProducer(byte[] serializedLogEvent) { + protected void sendEventToSystemProducer(byte[] serializedLogEvent) { metrics.logMessagesBytesSent.inc(serializedLogEvent.length); metrics.logMessagesCountSent.inc(); systemProducer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, keyBytes, serializedLogEvent));
[GitHub] [samza] mynameborat merged pull request #1470: SAMZA-2627: Make StreamAppender extensible for sending messages to SystemProducer
mynameborat merged pull request #1470: URL: https://github.com/apache/samza/pull/1470 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (SAMZA-2616) Upgrade ZK Client Version to 0.11
[ https://issues.apache.org/jira/browse/SAMZA-2616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stuart Perks updated SAMZA-2616: Fix Version/s: 1.7 > Upgrade ZK Client Version to 0.11 > - > > Key: SAMZA-2616 > URL: https://issues.apache.org/jira/browse/SAMZA-2616 > Project: Samza > Issue Type: Improvement >Reporter: Stuart Perks >Assignee: Stuart Perks >Priority: Minor > Fix For: 1.7 > > Time Spent: 2h 20m > Remaining Estimate: 0h > > The Zookeeper version sits at 3.4.6 with 3.6.2 now available. > The client is at 0.8 and has 0.11 available. Initial looks show that this > library is dated so is a much bigger change to upgrade. > This JIRA will upgrade the com.101tec:zkclient client to its latest version > of 0.11. > This will bring Zookeeper client version to 3.4.13. > This upgrade will bring enhancements, bugfixes and security patches. > [Code > Reference|https://github.com/apache/samza/blob/master/gradle/dependency-versions.gradle] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [samza] Sanil15 commented on pull request #1462: SAMZA-2617: Upgrade Kafka Client to 2.3.1
Sanil15 commented on pull request #1462: URL: https://github.com/apache/samza/pull/1462#issuecomment-790113532 Confirmed with boris, that we bump major version to 1.6 as soon as we cut 1.6 branch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [samza] Sanil15 merged pull request #1462: SAMZA-2617: Upgrade Kafka Client to 2.3.1
Sanil15 merged pull request #1462: URL: https://github.com/apache/samza/pull/1462 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[samza] branch master updated (93c41df -> 89b71ed)
This is an automated email from the ASF dual-hosted git repository. saniljain15 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/samza.git. from 93c41df SAMZA-2627: Make StreamAppender extensible for sending messages to SystemProducer (#1470) add 89b71ed SAMZA-2617: Upgrade Kafka Client to 2.3.1 (#1462) No new revisions were added by this update. Summary of changes: gradle/dependency-versions.gradle | 2 +- .../samza/system/kafka/KafkaConsumerProxy.java | 14 - .../samza/system/kafka/KafkaSystemConsumer.java| 9 ++ .../system/kafka/KafkaSystemConsumerMetrics.scala | 34 +++--- .../samza/system/kafka/MockKafkaProducer.java | 6 .../kafka/TestKafkaSystemConsumerMetrics.java | 10 +++ .../samza/sql/client/impl/SamzaExecutor.java | 9 ++ .../samza/test/processor/TestStreamProcessor.java | 19 ++-- .../harness/AbstractKafkaServerTestHarness.scala | 2 +- .../harness/AbstractZookeeperTestHarness.scala | 9 ++ 10 files changed, 53 insertions(+), 61 deletions(-)