samza git commit: SAMZA-1539: KafkaProducer potential hang on close() when task.drop.pr…

2017-12-22 Thread jmakes
Repository: samza
Updated Branches:
  refs/heads/master 29ecae891 -> ef1c9625c


SAMZA-1539: KafkaProducer potential hang on close() when task.drop.pr…

…oducer.errors==true

Author: Jacob Maes 

Reviewers: Boris Shkolnik 

Closes #390 from jmakes/samza-1539


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ef1c9625
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ef1c9625
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ef1c9625

Branch: refs/heads/master
Commit: ef1c9625c63b6c169585c5d1928298f36f3037fb
Parents: 29ecae8
Author: Jacob Maes 
Authored: Fri Dec 22 10:36:37 2017 -0800
Committer: Jacob Maes <--global>
Committed: Fri Dec 22 10:36:37 2017 -0800

--
 .../system/kafka/KafkaSystemProducer.scala  | 135 ++-
 .../system/kafka/TestKafkaSystemProducer.scala  |  39 --
 2 files changed, 103 insertions(+), 71 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/ef1c9625/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
--
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index 5e83666..9eaf895 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -28,7 +28,6 @@ import org.apache.kafka.clients.producer.Producer
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.clients.producer.RecordMetadata
 import org.apache.kafka.common.PartitionInfo
-import org.apache.kafka.common.errors.SerializationException
 import org.apache.samza.system.OutgoingMessageEnvelope
 import org.apache.samza.system.SystemProducer
 import org.apache.samza.system.SystemProducerException
@@ -46,32 +45,30 @@ class KafkaSystemProducer(systemName: String,
 
   // Represents a fatal error that caused the producer to close.
   val fatalException: AtomicReference[SystemProducerException] = new 
AtomicReference[SystemProducerException]()
-  @volatile var producer: Producer[Array[Byte], Array[Byte]] = null
-  val producerLock: Object = new Object
+  val producerRef: AtomicReference[Producer[Array[Byte], Array[Byte]]] = new 
AtomicReference[Producer[Array[Byte], Array[Byte]]]()
+  val producerCreationLock: Object = new Object
+  @volatile var stopped = false
 
   def start(): Unit = {
-producer = getProducer()
+producerRef.set(getProducer())
   }
 
   def stop() {
 info("Stopping producer for system: " + this.systemName)
 
-// stop() should not happen often so no need to optimize locking
-producerLock.synchronized {
-  try {
-if (producer != null) {
-  producer.close  // Also performs the equivalent of a flush()
-}
+stopped = true
+val currentProducer = producerRef.getAndSet(null)
+try {
+  if (currentProducer != null) {
+currentProducer.close // Also performs the equivalent of a flush()
+  }
 
-val exception = fatalException.get()
-if (exception != null) {
-  error("Observed an earlier send() error while closing producer", 
exception)
-}
-  } catch {
-case e: Exception => error("Error while closing producer for system: " 
+ systemName, e)
-  } finally {
-producer = null
+  val exception = fatalException.get()
+  if (exception != null) {
+error("Observed an earlier send() error while closing producer", 
exception)
   }
+} catch {
+  case e: Exception => error("Error while closing producer for system: " + 
systemName, e)
 }
   }
 
@@ -82,7 +79,7 @@ class KafkaSystemProducer(systemName: String,
 trace("Enqueuing message: %s, %s." format (source, envelope))
 
 val topicName = envelope.getSystemStream.getStream
-if (topicName == null || topicName == "") {
+if (topicName == null || topicName.isEmpty) {
   throw new IllegalArgumentException("Invalid system stream: " + 
envelope.getSystemStream)
 }
 
@@ -92,10 +89,7 @@ class KafkaSystemProducer(systemName: String,
   throw new SystemProducerException("Producer was unable to recover from 
previous exception.", globalProducerException)
 }
 
-val currentProducer = producer
-if (currentProducer == null) {
-  throw new SystemProducerException("Kafka producer is null.")
-}
+val currentProducer = getOrCreateCurrentProducer
 
 // Java-based Kafka producer API requires an "Integer" type partitionKey 
and does not allow custom overriding of Partitioners
 // Any kind of custom partitioning has to be done on the client-side
@@ -115,7 +109,7 @@ cl

[jira] [Commented] (SAMZA-1539) KafkaProducer potential hang on close() when task.drop.producer.errors==true

2017-12-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/SAMZA-1539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301783#comment-16301783
 ] 

ASF GitHub Bot commented on SAMZA-1539:
---

Github user asfgit closed the pull request at:

https://github.com/apache/samza/pull/390


> KafkaProducer potential hang on close() when task.drop.producer.errors==true
> 
>
> Key: SAMZA-1539
> URL: https://issues.apache.org/jira/browse/SAMZA-1539
> Project: Samza
>  Issue Type: Bug
>Reporter: Jake Maes
>Assignee: Jake Maes
> Fix For: 0.15.0
>
>
> The issue is caused by 2 bad behaviors:
> 1. KafkaProducer.close() does an unbounded thread.join() even when a force 
> close (timeout == 0 ms) is specified. 
> 2. KafkaSystemProducer.handleSendException() has a synchronized block to 
> recreate the producer when task.drop.producer.errors==true. When the IO 
> thread is waiting on the synchronized block, it prevents the KafkaProducer 
> from joining the IO thread.
> Fixing either one will fix the issue, but we will close this ticket as soon 
> as the Samza side is tested and committed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (SAMZA-1539) KafkaProducer potential hang on close() when task.drop.producer.errors==true

2017-12-22 Thread Jake Maes (JIRA)

 [ 
https://issues.apache.org/jira/browse/SAMZA-1539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jake Maes resolved SAMZA-1539.
--
Resolution: Fixed

Issue resolved by pull request 390
[https://github.com/apache/samza/pull/390]

> KafkaProducer potential hang on close() when task.drop.producer.errors==true
> 
>
> Key: SAMZA-1539
> URL: https://issues.apache.org/jira/browse/SAMZA-1539
> Project: Samza
>  Issue Type: Bug
>Reporter: Jake Maes
>Assignee: Jake Maes
> Fix For: 0.15.0
>
>
> The issue is caused by 2 bad behaviors:
> 1. KafkaProducer.close() does an unbounded thread.join() even when a force 
> close (timeout == 0 ms) is specified. 
> 2. KafkaSystemProducer.handleSendException() has a synchronized block to 
> recreate the producer when task.drop.producer.errors==true. When the IO 
> thread is waiting on the synchronized block, it prevents the KafkaProducer 
> from joining the IO thread.
> Fixing either one will fix the issue, but we will close this ticket as soon 
> as the Samza side is tested and committed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[1/2] samza git commit: Documentation for Samza SQL

2017-12-22 Thread xinyu
Repository: samza
Updated Branches:
  refs/heads/0.14.0 ef506a3c5 -> 804aea2b9


http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/samza-tools/src/main/java/org/apache/samza/tools/schemas/ProfileChangeEvent.avsc
--
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/schemas/ProfileChangeEvent.avsc
 
b/samza-tools/src/main/java/org/apache/samza/tools/schemas/ProfileChangeEvent.avsc
new file mode 100644
index 000..5c1e49d
--- /dev/null
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/schemas/ProfileChangeEvent.avsc
@@ -0,0 +1,51 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+{
+"name": "ProfileChangeEvent",
+"version" : 1,
+"namespace": "com.linkedin.samza.tools.avro",
+"type": "record",
+"fields": [
+{
+"name": "Name",
+"doc": "Name of the profile.",
+"type": ["null", "string"],
+"default":null
+},
+{
+"name": "NewCompany",
+"doc": "Name of the new company the person joined.",
+"type": ["null", "string"],
+"default":null
+},
+{
+"name": "OldCompany",
+"doc": "Name of the old company the person was working.",
+"type": ["null", "string"],
+"default":null
+},
+{
+"name": "ProfileChangeTimestamp",
+"doc": "Time at which the profile was changed.",
+"type": ["null", "long"],
+"default":null
+}
+]
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/samza-tools/src/main/java/org/apache/samza/tools/schemas/ProfileChangeEvent.java
--
diff --git 
a/samza-tools/src/main/java/org/apache/samza/tools/schemas/ProfileChangeEvent.java
 
b/samza-tools/src/main/java/org/apache/samza/tools/schemas/ProfileChangeEvent.java
new file mode 100644
index 000..c8e951c
--- /dev/null
+++ 
b/samza-tools/src/main/java/org/apache/samza/tools/schemas/ProfileChangeEvent.java
@@ -0,0 +1,60 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+/**
+ * Autogenerated by Avro
+ *
+ * DO NOT EDIT DIRECTLY
+ */
+package org.apache.samza.tools.schemas;
+
+@SuppressWarnings("all")
+public class ProfileChangeEvent extends 
org.apache.avro.specific.SpecificRecordBase implements 
org.apache.avro.specific.SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = 
org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ProfileChangeEvent\",\"namespace\":\"com.linkedin.samza.tools.avro\",\"fields\":[{\"name\":\"Name\",\"type\":[\"null\",\"string\"],\"doc\":\"Name
 of the 
profile.\",\"default\":null},{\"name\":\"NewCompany\",\"type\":[\"null\",\"string\"],\"doc\":\"Name
 of the new company the person 
joined.\",\"default\":null},{\"name\":\"OldCompany\",\"type\":[\"null\",\"string\"],\"doc\":\"Name
 of the old company the person was 
working.\",\"default\":null},{\"name\":\"ProfileChangeTimestamp\",\"type\":[\"null\",\"long\"],\"doc\":\"Time
 at which the profile was changed.\",\"default\":null}]}");
+  /** Name of the profile. */
+  public java.lang.CharSequence Name;
+  /** Name of the new company the person joined. */
+  public java.lang.CharSequence NewCompany;
+  /** Name of the old company the person was working. */
+  public java.lang.CharSequence OldCompany;
+  /** Time at whi

[2/2] samza git commit: Documentation for Samza SQL

2017-12-22 Thread xinyu
Documentation for Samza SQL

**Samza tools** :
Contains the following tools that can be used for playing with Samza sql or any 
other samza job.

1. Generate kafka events : Tool used to generate avro serialized kafka events
2. Event hub consumer : Tool used to consume events from event hubs topic. This 
can be used if the samza job writes events to event hubs.
3. Samza sql console : Tool used to execute SQL using samza sql.

Adds documentation on how to use Samza SQL on a local machine and on a yarn 
environment and their associated Samza tooling.

https://issues.apache.org/jira/browse/SAMZA-1526

Author: Srinivasulu Punuru 

Reviewers: Yi Pan, Jagadish

Closes #374 from srinipunuru/docs.1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/804aea2b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/804aea2b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/804aea2b

Branch: refs/heads/0.14.0
Commit: 804aea2b957fbd30a2b79d5a07fbb168e2871244
Parents: ef506a3
Author: Srinivasulu Punuru 
Authored: Tue Dec 12 10:46:01 2017 -0800
Committer: xiliu 
Committed: Fri Dec 22 10:48:45 2017 -0800

--
 build.gradle|  28 +++
 docs/README.md  |   4 +-
 docs/learn/tutorials/versioned/index.md |   3 +
 docs/learn/tutorials/versioned/samza-sql.md | 123 +++
 docs/learn/tutorials/versioned/samza-tools.md   | 109 ++
 docs/startup/download/index.md  |   7 +
 gradle/dependency-versions.gradle   |   1 +
 .../apache/samza/sql/avro/AvroRelConverter.java |   6 +-
 samza-tools/config/eh-consumer-log4j.xml|  35 
 .../config/generate-kafka-events-log4j.xml  |  35 
 samza-tools/config/samza-sql-console-log4j.xml  |  35 
 samza-tools/scripts/eh-consumer.sh  |  34 +++
 samza-tools/scripts/generate-kafka-events.sh|  34 +++
 samza-tools/scripts/samza-sql-console.sh|  34 +++
 .../apache/samza/tools/CommandLineHelper.java   |  42 
 .../tools/ConsoleLoggingSystemFactory.java  | 126 
 .../samza/tools/EventHubConsoleConsumer.java| 120 +++
 .../apache/samza/tools/GenerateKafkaEvents.java | 205 +++
 .../samza/tools/RandomValueGenerator.java   |  87 
 .../org/apache/samza/tools/SamzaSqlConsole.java | 188 +
 .../tools/avro/AvroSchemaGenRelConverter.java   |  94 +
 .../avro/AvroSchemaGenRelConverterFactory.java  |  43 
 .../samza/tools/avro/AvroSerDeFactory.java  |  96 +
 .../tools/json/JsonRelConverterFactory.java |  93 +
 .../samza/tools/schemas/PageViewEvent.avsc  |  51 +
 .../samza/tools/schemas/PageViewEvent.java  |  60 ++
 .../samza/tools/schemas/ProfileChangeEvent.avsc |  51 +
 .../samza/tools/schemas/ProfileChangeEvent.java |  60 ++
 .../apache/samza/tools/udf/RegexMatchUdf.java   |  40 
 samza-tools/src/main/resources/log4j.xml|  43 
 settings.gradle |   3 +-
 31 files changed, 1886 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/build.gradle
--
diff --git a/build.gradle b/build.gradle
index 50cc5e0..860316e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -328,6 +328,34 @@ project(':samza-sql') {
   }
 }
 
+project(':samza-tools') {
+  apply plugin: 'java'
+
+  dependencies {
+compile project(':samza-sql')
+compile project(':samza-api')
+compile project(':samza-azure')
+compile "log4j:log4j:$log4jVersion"
+compile "org.slf4j:slf4j-api:$slf4jVersion"
+compile "org.slf4j:slf4j-log4j12:$slf4jVersion"
+compile "commons-cli:commons-cli:$commonsCliVersion"
+compile "org.apache.avro:avro:$avroVersion"
+compile "org.apache.commons:commons-lang3:$commonsLang3Version"
+compile "org.apache.kafka:kafka-clients:$kafkaVersion"
+  }
+
+  tasks.create(name: "releaseToolsTarGz", dependsOn: 
configurations.archives.artifacts, type: Tar) {
+into "samza-tools-${version}"
+compression = Compression.GZIP
+from(project.file("./scripts")) { into "scripts/" }
+from(project.file("./config")) { into "config/" }
+from(project(':samza-shell').file("src/main/bash/run-class.sh")) { into 
"scripts/" }
+from(configurations.runtime) { into("lib/") }
+from(configurations.archives.artifacts.files) { into("lib/") }
+duplicatesStrategy 'exclude'
+  }
+}
+
 project(":samza-kafka_$scalaVersion") {
   apply plugin: 'scala'
 

http://git-wip-us.apache.org/repos/asf/samza/blob/804aea2b/docs/README.md
--
diff --git a/docs/README.md b/docs/README.md
index 21a4991..3de2a78 1006

samza git commit: Fix a couple sonarcloud issues with samza-1537

2017-12-22 Thread jmakes
Repository: samza
Updated Branches:
  refs/heads/master ef1c9625c -> 9a8099cda


Fix a couple sonarcloud issues with samza-1537

Author: Jacob Maes 

Reviewers: Jagadish 

Closes #393 from jmakes/streamappender-sonarcloud


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9a8099cd
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9a8099cd
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9a8099cd

Branch: refs/heads/master
Commit: 9a8099cdaa2a8ee6fa64d263ebf6704ee5a108da
Parents: ef1c962
Author: Jacob Maes 
Authored: Fri Dec 22 11:41:27 2017 -0800
Committer: Jacob Maes <--global>
Committed: Fri Dec 22 11:41:27 2017 -0800

--
 .../java/org/apache/samza/logging/log4j/StreamAppender.java | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/9a8099cd/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
--
diff --git 
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index 0ea8b68..5f41959 100644
--- 
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -152,7 +152,7 @@ public class StreamAppender extends AppenderSkeleton {
 // Emit a metric which can be monitored to ensure it doesn't 
happen often.
 metrics.logMessagesDropped.inc(messagesDropped);
   }
-  metrics.bufferFillPct.set(Math.round(100 * logQueue.size() / 
DEFAULT_QUEUE_SIZE));
+  metrics.bufferFillPct.set(Math.round(100f * logQueue.size() / 
DEFAULT_QUEUE_SIZE));
 }
   } catch (Exception e) {
 System.err.println("[StreamAppender] Error sending log message:");
@@ -188,7 +188,8 @@ public class StreamAppender extends AppenderSkeleton {
   try {
 transferThread.join();
   } catch (InterruptedException e) {
-log.error("Interrupted while waiting for sink thread to finish.", e);
+log.error("Interrupted while waiting for transfer thread to finish.", 
e);
+Thread.currentThread().interrupt();
   }
 
   flushSystemProducer();



[samza] Git Push Summary

2017-12-22 Thread xinyu
Repository: samza
Updated Tags:  refs/tags/release-0.14.0-rc4 [created] c0ffae6f6


[jira] [Created] (SAMZA-1547) Fix default value for the grouper-factory configuration in KafkaCheckpointManager

2017-12-22 Thread Jagadish (JIRA)
Jagadish created SAMZA-1547:
---

 Summary: Fix default value for the grouper-factory configuration 
in KafkaCheckpointManager
 Key: SAMZA-1547
 URL: https://issues.apache.org/jira/browse/SAMZA-1547
 Project: Samza
  Issue Type: Bug
Reporter: Jagadish






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (SAMZA-1547) Fix default value for the grouper-factory configuration in KafkaCheckpointManager

2017-12-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/SAMZA-1547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301951#comment-16301951
 ] 

ASF GitHub Bot commented on SAMZA-1547:
---

GitHub user vjagadish1989 opened a pull request:

https://github.com/apache/samza/pull/394

SAMZA-1547: Parse default value grouper-factory config in KafkaCheckpointMgr

- Additionally, updated unit-tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vjagadish1989/samza kcm-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/samza/pull/394.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #394


commit 3a08518509c311397e23692e9bb1bad037c96a68
Author: Jagadish 
Date:   2017-12-22T20:49:50Z

SAMZA-1547: Fix default handling of grouper-factory configuration in 
KafkaCheckpointManager




> Fix default value for the grouper-factory configuration in 
> KafkaCheckpointManager
> -
>
> Key: SAMZA-1547
> URL: https://issues.apache.org/jira/browse/SAMZA-1547
> Project: Samza
>  Issue Type: Bug
>Reporter: Jagadish
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


samza git commit: SAMZA-1547; Parse default value grouper-factory config in KafkaCheckpointMgr

2017-12-22 Thread jagadish
Repository: samza
Updated Branches:
  refs/heads/master 9a8099cda -> 93219c78d


SAMZA-1547; Parse default value grouper-factory config in KafkaCheckpointMgr

- Additionally, updated all unit-tests.

Author: Jagadish 

Reviewers: Prateek M 

Closes #394 from vjagadish1989/kcm-fix


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/93219c78
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/93219c78
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/93219c78

Branch: refs/heads/master
Commit: 93219c78d6f473d2c7ad4d8702a25f704ee61a7a
Parents: 9a8099c
Author: Jagadish 
Authored: Fri Dec 22 13:58:33 2017 -0800
Committer: Jagadish 
Committed: Fri Dec 22 13:58:33 2017 -0800

--
 .../org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala | 2 +-
 .../apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala | 1 -
 2 files changed, 1 insertion(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/93219c78/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
--
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 217b2b6..e1187c5 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -59,7 +59,7 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
   val checkpointTopic: String = checkpointSpec.getPhysicalName
   val checkpointSsp = new SystemStreamPartition(checkpointSystem, 
checkpointTopic, new Partition(0))
   val checkpointKeySerde = new KafkaCheckpointLogKeySerde
-  val expectedGrouperFactory = config.get(JobConfig.SSP_GROUPER_FACTORY)
+  val expectedGrouperFactory = new 
JobConfig(config).getSystemStreamPartitionGrouperFactory
 
   val systemProducer = systemFactory.getProducer(checkpointSystem, config, 
metricsRegistry)
   val systemConsumer = systemFactory.getConsumer(checkpointSystem, config, 
metricsRegistry)

http://git-wip-us.apache.org/repos/asf/samza/blob/93219c78/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
--
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index dcf4068..ec9f3a0 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -150,7 +150,6 @@ class TestKafkaCheckpointManager extends 
KafkaServerTestHarness {
 new MapConfig(new ImmutableMap.Builder[String, String]()
   .put(JobConfig.JOB_NAME, "some-job-name")
   .put(JobConfig.JOB_ID, "i001")
-  .put(JobConfig.SSP_GROUPER_FACTORY, sspGrouperFactoryName)
   .put(s"systems.$checkpointSystemName.samza.factory", 
classOf[KafkaSystemFactory].getCanonicalName)
   .put(s"systems.$checkpointSystemName.producer.bootstrap.servers", 
brokers)
   .put(s"systems.$checkpointSystemName.consumer.zookeeper.connect", 
zkConnect)



samza git commit: SAMZA-1547; Parse default value grouper-factory config in KafkaCheckpointMgr

2017-12-22 Thread jagadish
Repository: samza
Updated Branches:
  refs/heads/0.14.0 804aea2b9 -> 7b42474d3


SAMZA-1547; Parse default value grouper-factory config in KafkaCheckpointMgr

- Additionally, updated all unit-tests.

Author: Jagadish 

Reviewers: Prateek M 

Closes #394 from vjagadish1989/kcm-fix

(cherry picked from commit 93219c78d6f473d2c7ad4d8702a25f704ee61a7a)
Signed-off-by: Jagadish 


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7b42474d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7b42474d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7b42474d

Branch: refs/heads/0.14.0
Commit: 7b42474d33f96eebb81ad2542d0f2297d1d51d34
Parents: 804aea2
Author: Jagadish 
Authored: Fri Dec 22 13:58:33 2017 -0800
Committer: Jagadish 
Committed: Fri Dec 22 13:59:31 2017 -0800

--
 .../org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala | 2 +-
 .../apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala | 1 -
 2 files changed, 1 insertion(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/samza/blob/7b42474d/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
--
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 217b2b6..e1187c5 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -59,7 +59,7 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
   val checkpointTopic: String = checkpointSpec.getPhysicalName
   val checkpointSsp = new SystemStreamPartition(checkpointSystem, 
checkpointTopic, new Partition(0))
   val checkpointKeySerde = new KafkaCheckpointLogKeySerde
-  val expectedGrouperFactory = config.get(JobConfig.SSP_GROUPER_FACTORY)
+  val expectedGrouperFactory = new 
JobConfig(config).getSystemStreamPartitionGrouperFactory
 
   val systemProducer = systemFactory.getProducer(checkpointSystem, config, 
metricsRegistry)
   val systemConsumer = systemFactory.getConsumer(checkpointSystem, config, 
metricsRegistry)

http://git-wip-us.apache.org/repos/asf/samza/blob/7b42474d/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
--
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index dcf4068..ec9f3a0 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -150,7 +150,6 @@ class TestKafkaCheckpointManager extends 
KafkaServerTestHarness {
 new MapConfig(new ImmutableMap.Builder[String, String]()
   .put(JobConfig.JOB_NAME, "some-job-name")
   .put(JobConfig.JOB_ID, "i001")
-  .put(JobConfig.SSP_GROUPER_FACTORY, sspGrouperFactoryName)
   .put(s"systems.$checkpointSystemName.samza.factory", 
classOf[KafkaSystemFactory].getCanonicalName)
   .put(s"systems.$checkpointSystemName.producer.bootstrap.servers", 
brokers)
   .put(s"systems.$checkpointSystemName.consumer.zookeeper.connect", 
zkConnect)



[jira] [Commented] (SAMZA-1547) Fix default value for the grouper-factory configuration in KafkaCheckpointManager

2017-12-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/SAMZA-1547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301990#comment-16301990
 ] 

ASF GitHub Bot commented on SAMZA-1547:
---

Github user asfgit closed the pull request at:

https://github.com/apache/samza/pull/394


> Fix default value for the grouper-factory configuration in 
> KafkaCheckpointManager
> -
>
> Key: SAMZA-1547
> URL: https://issues.apache.org/jira/browse/SAMZA-1547
> Project: Samza
>  Issue Type: Bug
>Reporter: Jagadish
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[samza] Git Push Summary

2017-12-22 Thread xinyu
Repository: samza
Updated Tags:  refs/tags/release-0.14.0-rc5 [created] d49606bfc