[jira] [Commented] (SAMZA-1317) Regression: changelog topics do not allow periods in the topic name
[ https://issues.apache.org/jira/browse/SAMZA-1317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035785#comment-16035785 ] ASF GitHub Bot commented on SAMZA-1317: --- Github user asfgit closed the pull request at: https://github.com/apache/samza/pull/213 > Regression: changelog topics do not allow periods in the topic name > --- > > Key: SAMZA-1317 > URL: https://issues.apache.org/jira/browse/SAMZA-1317 > Project: Samza > Issue Type: Bug >Reporter: Prateek Maheshwari >Assignee: Jake Maes > Fix For: 0.13.0 > > > Changelog topic configuration currently doesn't not work if we try to use a > streams.streamId based configuration for the topic name. E.g., > {code} > streams.my-changelog-topic.samza.system = my-system > streams.my-changelog-topic.samza.physical.name = my.changelog.topic.name > stores.my-store.changelog = my-changelog-topic > {code} > Expected changelog topic name = my.changelog.topic.name > Actual changelog topic name = my-changelog-topic > We should resolve the provided system.topic configuration value against the > new streams. based configurations. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
samza git commit: SAMZA-1317: Changelog validation error for topics with period in the
Repository: samza Updated Branches: refs/heads/master 373ae1c6d -> a4174309a SAMZA-1317: Changelog validation error for topics with period in the Author: Jacob MaesReviewers: Prateek Maheshwari Closes #213 from jmakes/samza-1317-migration-fix Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a4174309 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a4174309 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a4174309 Branch: refs/heads/master Commit: a4174309a495ff1e43c3d4c82cababa54dd71d54 Parents: 373ae1c Author: Jacob Maes Authored: Fri Jun 2 20:42:14 2017 -0700 Committer: Jacob Maes Committed: Fri Jun 2 20:42:14 2017 -0700 -- .../samza/system/kafka/KafkaStreamSpec.java | 7 +-- .../samza/system/kafka/KafkaSystemAdmin.scala| 4 +++- .../system/kafka/TestKafkaSystemAdminJava.java | 19 +++ 3 files changed, 27 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/samza/blob/a4174309/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java -- diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java index 0477854..c7e82f7 100644 --- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java @@ -116,12 +116,15 @@ public class KafkaStreamSpec extends StreamSpec { /** * Convenience constructor to create a KafkaStreamSpec with just a topicName, systemName, and partitionCount. * + * @param id The application-unique logical identifier for the stream. It is used to distinguish between + *streams in a Samza application so it must be unique in the context of one deployable unit. + *It does not need to be globally unique or unique with respect to a host. * @param topicName The name of the topic. * @param systemName The name of the System. See {@link org.apache.samza.system.SystemFactory} * @param partitionCount The number of partitions. */ - public KafkaStreamSpec(String topicName, String systemName, int partitionCount) { -this(topicName, topicName, systemName, partitionCount, DEFAULT_REPLICATION_FACTOR, new Properties()); + public KafkaStreamSpec(String id, String topicName, String systemName, int partitionCount) { +this(id, topicName, systemName, partitionCount, DEFAULT_REPLICATION_FACTOR, new Properties()); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/a4174309/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala -- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index 2f82754..af77d5b 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -21,6 +21,7 @@ package org.apache.samza.system.kafka import java.util import java.util.{Properties, UUID} + import kafka.admin.AdminUtils import kafka.api._ import kafka.common.TopicAndPartition @@ -31,6 +32,7 @@ import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadat import org.apache.samza.system._ import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, KafkaUtil, Logging} import org.apache.samza.{Partition, SamzaException} + import scala.collection.JavaConverters._ @@ -514,7 +516,7 @@ class KafkaSystemAdmin( * will auto-create a new topic. */ override def validateChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = { -validateStream(new KafkaStreamSpec(topicName, systemName, numKafkaChangelogPartitions)) +validateStream(new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions)) } /** http://git-wip-us.apache.org/repos/asf/samza/blob/a4174309/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java -- diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java index a47ba9d..ce59b40 100644 ---
[jira] [Commented] (SAMZA-1317) Regression: changelog topics do not allow periods in the topic name
[ https://issues.apache.org/jira/browse/SAMZA-1317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035780#comment-16035780 ] ASF GitHub Bot commented on SAMZA-1317: --- GitHub user jmakes opened a pull request: https://github.com/apache/samza/pull/213 SAMZA-1317: Changelog validation error for topics with period in the You can merge this pull request into a Git repository by running: $ git pull https://github.com/jmakes/samza samza-1317-migration-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/samza/pull/213.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #213 commit ba05b9f24aac8f9cfc38b4805ced54722067e34d Author: Jacob MaesDate: 2017-06-02T21:42:45Z SAMZA-1317: Changelog topic configuration should accept streams.[streamId] configurations commit 6ffb25fdfaf44294b756017c3220f215dde798b4 Author: Jacob Maes Date: 2017-06-02T21:44:03Z Rm unnecessary whitespace commit a4d7cadf6331a32d96d863a36b57322b07980f3e Author: Jacob Maes Date: 2017-06-03T03:17:43Z SAMZA-1317: Changelog validation error for topics with period in the name commit 10f4a583bfbf70dd58c84c4697b24cecd84f6376 Author: Jacob Maes Date: 2017-06-03T03:19:26Z remove gradle snippet for testing commit 60c79bf5042e9c4eef90c97a61b1ee3483791bd3 Author: Jacob Maes Date: 2017-06-03T03:21:55Z Merge branch 'master' into samza-1317-migration-fix > Regression: changelog topics do not allow periods in the topic name > --- > > Key: SAMZA-1317 > URL: https://issues.apache.org/jira/browse/SAMZA-1317 > Project: Samza > Issue Type: Bug >Reporter: Prateek Maheshwari >Assignee: Jake Maes > Fix For: 0.13.0 > > > Changelog topic configuration currently doesn't not work if we try to use a > streams.streamId based configuration for the topic name. E.g., > {code} > streams.my-changelog-topic.samza.system = my-system > streams.my-changelog-topic.samza.physical.name = my.changelog.topic.name > stores.my-store.changelog = my-changelog-topic > {code} > Expected changelog topic name = my.changelog.topic.name > Actual changelog topic name = my-changelog-topic > We should resolve the provided system.topic configuration value against the > new streams. based configurations. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[samza] Git Push Summary
Repository: samza Updated Tags: refs/tags/release-0.13.0-rc5 [created] f15254cd5
[jira] [Updated] (SAMZA-1317) Regression: changelog topics do not allow periods in the topic name
[ https://issues.apache.org/jira/browse/SAMZA-1317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jake Maes updated SAMZA-1317: - Summary: Regression: changelog topics do not allow periods in the topic name (was: Changelog topic configuration should accept streams. configurations) > Regression: changelog topics do not allow periods in the topic name > --- > > Key: SAMZA-1317 > URL: https://issues.apache.org/jira/browse/SAMZA-1317 > Project: Samza > Issue Type: Bug >Reporter: Prateek Maheshwari >Assignee: Jake Maes > Fix For: 0.13.0 > > > Changelog topic configuration currently doesn't not work if we try to use a > streams.streamId based configuration for the topic name. E.g., > {code} > streams.my-changelog-topic.samza.system = my-system > streams.my-changelog-topic.samza.physical.name = my.changelog.topic.name > stores.my-store.changelog = my-changelog-topic > {code} > Expected changelog topic name = my.changelog.topic.name > Actual changelog topic name = my-changelog-topic > We should resolve the provided system.topic configuration value against the > new streams. based configurations. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (SAMZA-1320) Changelog topic configuration should accept streams. configurations
[ https://issues.apache.org/jira/browse/SAMZA-1320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jake Maes updated SAMZA-1320: - Description: This is a follow up to SAMZA-1317, which solved the migration issue but doesn't allow users to use streams..* to set properties on the changelog. After this ticket, users will configure the changelog on a store as a streamId, like this: {code} streams.my-changelog-topic.samza.system = my-system streams.my-changelog-topic.samza.physical.name = my.changelog.topic.name streams.my-changelog-topic.some-system-specific-property = some-system-specific-value stores.my-store.changelog = my-changelog-topic {code} We should resolve the provided system.topic configuration value against the new streams. based configurations. was: Changelog topic configuration currently doesn't not work if we try to use a streams.streamId based configuration for the topic name. E.g., {code} streams.my-changelog-topic.samza.system = my-system streams.my-changelog-topic.samza.physical.name = my.changelog.topic.name stores.my-store.changelog = my-changelog-topic {code} Expected changelog topic name = my.changelog.topic.name Actual changelog topic name = my-changelog-topic We should resolve the provided system.topic configuration value against the new streams. based configurations. > Changelog topic configuration should accept streams. configurations > - > > Key: SAMZA-1320 > URL: https://issues.apache.org/jira/browse/SAMZA-1320 > Project: Samza > Issue Type: Bug >Reporter: Jake Maes >Assignee: Jake Maes > Fix For: 0.13.0 > > > This is a follow up to SAMZA-1317, which solved the migration issue but > doesn't allow users to use streams..* to set properties on the > changelog. > After this ticket, users will configure the changelog on a store as a > streamId, like this: > {code} > streams.my-changelog-topic.samza.system = my-system > streams.my-changelog-topic.samza.physical.name = my.changelog.topic.name > streams.my-changelog-topic.some-system-specific-property = > some-system-specific-value > stores.my-store.changelog = my-changelog-topic > {code} > We should resolve the provided system.topic configuration value against the > new streams. based configurations. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (SAMZA-1320) Changelog topic configuration should accept streams. configurations
Jake Maes created SAMZA-1320: Summary: Changelog topic configuration should accept streams. configurations Key: SAMZA-1320 URL: https://issues.apache.org/jira/browse/SAMZA-1320 Project: Samza Issue Type: Bug Reporter: Jake Maes Assignee: Jake Maes Fix For: 0.13.0 Changelog topic configuration currently doesn't not work if we try to use a streams.streamId based configuration for the topic name. E.g., {code} streams.my-changelog-topic.samza.system = my-system streams.my-changelog-topic.samza.physical.name = my.changelog.topic.name stores.my-store.changelog = my-changelog-topic {code} Expected changelog topic name = my.changelog.topic.name Actual changelog topic name = my-changelog-topic We should resolve the provided system.topic configuration value against the new streams. based configurations. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (SAMZA-1317) Changelog topic configuration should accept streams. configurations
[ https://issues.apache.org/jira/browse/SAMZA-1317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jake Maes resolved SAMZA-1317. -- Resolution: Fixed Fix Version/s: (was: 0.14.0) 0.13.0 Issue resolved by pull request 212 [https://github.com/apache/samza/pull/212] > Changelog topic configuration should accept streams. configurations > - > > Key: SAMZA-1317 > URL: https://issues.apache.org/jira/browse/SAMZA-1317 > Project: Samza > Issue Type: Bug >Reporter: Prateek Maheshwari >Assignee: Jake Maes > Fix For: 0.13.0 > > > Changelog topic configuration currently doesn't not work if we try to use a > streams.streamId based configuration for the topic name. E.g., > {code} > streams.my-changelog-topic.samza.system = my-system > streams.my-changelog-topic.samza.physical.name = my.changelog.topic.name > stores.my-store.changelog = my-changelog-topic > {code} > Expected changelog topic name = my.changelog.topic.name > Actual changelog topic name = my-changelog-topic > We should resolve the provided system.topic configuration value against the > new streams. based configurations. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (SAMZA-1317) Changelog topic configuration should accept streams. configurations
[ https://issues.apache.org/jira/browse/SAMZA-1317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035489#comment-16035489 ] ASF GitHub Bot commented on SAMZA-1317: --- Github user asfgit closed the pull request at: https://github.com/apache/samza/pull/212 > Changelog topic configuration should accept streams. configurations > - > > Key: SAMZA-1317 > URL: https://issues.apache.org/jira/browse/SAMZA-1317 > Project: Samza > Issue Type: Bug >Reporter: Prateek Maheshwari >Assignee: Jake Maes > Fix For: 0.14.0 > > > Changelog topic configuration currently doesn't not work if we try to use a > streams.streamId based configuration for the topic name. E.g., > {code} > streams.my-changelog-topic.samza.system = my-system > streams.my-changelog-topic.samza.physical.name = my.changelog.topic.name > stores.my-store.changelog = my-changelog-topic > {code} > Expected changelog topic name = my.changelog.topic.name > Actual changelog topic name = my-changelog-topic > We should resolve the provided system.topic configuration value against the > new streams. based configurations. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
samza git commit: SAMZA-1317: Changelog topic configuration should accept streams.[streamId] configurations
Repository: samza Updated Branches: refs/heads/master e565856aa -> 373ae1c6d SAMZA-1317: Changelog topic configuration should accept streams.[streamId] configurations Author: Jacob MaesReviewers: Prateek Maheshwari Closes #212 from jmakes/samza-1317-migration-fix Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/373ae1c6 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/373ae1c6 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/373ae1c6 Branch: refs/heads/master Commit: 373ae1c6d014676c738f8abb52aa27e38db0f91c Parents: e565856 Author: Jacob Maes Authored: Fri Jun 2 14:53:31 2017 -0700 Committer: Jacob Maes Committed: Fri Jun 2 14:53:31 2017 -0700 -- .../samza/system/kafka/KafkaSystemAdmin.scala | 8 +++- .../system/kafka/TestKafkaSystemAdminJava.java | 46 2 files changed, 43 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/samza/blob/373ae1c6/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala -- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index 8c90c6c..2f82754 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -35,6 +35,10 @@ import scala.collection.JavaConverters._ object KafkaSystemAdmin extends Logging { + // Use a dummy string for the stream id. The physical name and partition count are all that matter for changelog creation, so the dummy string should not be used. + // We cannot use the topic name, as it may include special chars which are not allowed in stream IDs. See SAMZA-1317 + val CHANGELOG_STREAMID = "unused-temp-changelog-stream-id" + /** * A helper method that takes oldest, newest, and upcoming offsets for each * system stream partition, and creates a single map from stream name to @@ -490,10 +494,10 @@ class KafkaSystemAdmin( class KafkaChangelogException(s: String, t: Throwable) extends SamzaException(s, t) { def this(s: String) = this(s, null) } - + override def createChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = { val topicMeta = topicMetaInformation.getOrElse(topicName, throw new KafkaChangelogException("Unable to find topic information for topic " + topicName)) -val spec = new KafkaStreamSpec(topicName, topicName, systemName, numKafkaChangelogPartitions, topicMeta.replicationFactor, topicMeta.kafkaProps) +val spec = new KafkaStreamSpec(CHANGELOG_STREAMID, topicName, systemName, numKafkaChangelogPartitions, topicMeta.replicationFactor, topicMeta.kafkaProps) if (createStream(spec)) { info("Created changelog stream %s." format topicName) http://git-wip-us.apache.org/repos/asf/samza/blob/373ae1c6/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java -- diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java index f5bc73a..a47ba9d 100644 --- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java @@ -19,6 +19,9 @@ package org.apache.samza.system.kafka; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.StreamValidationException; import org.apache.samza.system.SystemAdmin; @@ -27,13 +30,7 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { @@ -67,7 +64,38 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps)); SystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap))); -StreamSpec spec = new StreamSpec(STREAM, STREAM, SYSTEM(), PARTITIONS); +StreamSpec spec = new
[jira] [Commented] (SAMZA-1317) Changelog topic configuration should accept streams. configurations
[ https://issues.apache.org/jira/browse/SAMZA-1317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035476#comment-16035476 ] ASF GitHub Bot commented on SAMZA-1317: --- GitHub user jmakes opened a pull request: https://github.com/apache/samza/pull/212 SAMZA-1317: Changelog topic configuration should accept streams.[streamId] configurations You can merge this pull request into a Git repository by running: $ git pull https://github.com/jmakes/samza samza-1317-migration-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/samza/pull/212.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #212 commit ba05b9f24aac8f9cfc38b4805ced54722067e34d Author: Jacob MaesDate: 2017-06-02T21:42:45Z SAMZA-1317: Changelog topic configuration should accept streams.[streamId] configurations commit 6ffb25fdfaf44294b756017c3220f215dde798b4 Author: Jacob Maes Date: 2017-06-02T21:44:03Z Rm unnecessary whitespace > Changelog topic configuration should accept streams. configurations > - > > Key: SAMZA-1317 > URL: https://issues.apache.org/jira/browse/SAMZA-1317 > Project: Samza > Issue Type: Bug >Reporter: Prateek Maheshwari >Assignee: Jake Maes > Fix For: 0.14.0 > > > Changelog topic configuration currently doesn't not work if we try to use a > streams.streamId based configuration for the topic name. E.g., > {code} > streams.my-changelog-topic.samza.system = my-system > streams.my-changelog-topic.samza.physical.name = my.changelog.topic.name > stores.my-store.changelog = my-changelog-topic > {code} > Expected changelog topic name = my.changelog.topic.name > Actual changelog topic name = my-changelog-topic > We should resolve the provided system.topic configuration value against the > new streams. based configurations. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (SAMZA-1271) Guarantee deterministic and predictable order for operator function initialization
[ https://issues.apache.org/jira/browse/SAMZA-1271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16034297#comment-16034297 ] ASF GitHub Bot commented on SAMZA-1271: --- GitHub user vjagadish1989 opened a pull request: https://github.com/apache/samza/pull/211 SAMZA-1271: Guarantee predictable, deterministic order for operator initialization and finalization Currently, the order of initialization of operators in the Samza high level API is not deterministic. The non-determinism arises from two primary causes: - No fixed order of iteration for all subscribed `OperatorSpec`s for a given `MessageStream` - No fixed order of iteration for all the `OperatorImpl`s in the `OperatorImplGraph` We aim to provide the following 2 guarantees in this patch: For any 2 operators A, B in the graph, if B consumes the output of A: - A is initialized before B is initialized - A is finalized only after B is finalized You can merge this pull request into a Git repository by running: $ git pull https://github.com/vjagadish1989/samza deterministic_order Alternatively you can review and apply these changes as the patch at: https://github.com/apache/samza/pull/211.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #211 commit 95dde24595702748ac4d448fb528a3c3bb8c43da Author: vjagadish1989Date: 2017-06-02T07:29:03Z Guarantee deterministic order for operator initialization and finalization > Guarantee deterministic and predictable order for operator function > initialization > -- > > Key: SAMZA-1271 > URL: https://issues.apache.org/jira/browse/SAMZA-1271 > Project: Samza > Issue Type: Bug >Reporter: Prateek Maheshwari >Assignee: Prateek Maheshwari > Fix For: 0.14.0 > > > Should also update InitableFunction documentation to clarify the > initialization order. -- This message was sent by Atlassian JIRA (v6.3.15#6346)