merge

Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9217644e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9217644e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9217644e

Branch: refs/heads/NewKafkaSystemConsumer
Commit: 9217644ec05cc9dfe0140b5ee488fcea2fed83b9
Parents: 0b6768f 728dc18
Author: Boris S <[email protected]>
Authored: Fri Sep 7 16:00:02 2018 -0700
Committer: Boris S <[email protected]>
Committed: Fri Sep 7 16:00:02 2018 -0700

----------------------------------------------------------------------
 .../application/ApplicationDescriptor.java      |  80 +++
 .../samza/application/SamzaApplication.java     |  40 ++
 .../samza/application/StreamApplication.java    |  75 +--
 .../StreamApplicationDescriptor.java            | 113 ++++
 .../samza/application/TaskApplication.java      |  86 +++
 .../application/TaskApplicationDescriptor.java  |  64 ++
 .../java/org/apache/samza/config/Config.java    |   3 +-
 .../samza/metrics/MetricsReporterFactory.java   |   5 +-
 .../apache/samza/operators/MessageStream.java   |   9 +-
 .../org/apache/samza/operators/StreamGraph.java | 120 ----
 .../operators/functions/ClosableFunction.java   |   7 +-
 .../operators/functions/InitableFunction.java   |   6 +-
 .../operators/functions/StreamExpander.java     |  16 +-
 .../apache/samza/runtime/ApplicationRunner.java |  92 +--
 .../samza/runtime/ApplicationRunners.java       |  82 +++
 .../apache/samza/runtime/ProcessorContext.java  |  31 +
 .../runtime/ProcessorLifecycleListener.java     |  55 ++
 .../ProcessorLifecycleListenerFactory.java      |  40 ++
 .../samza/task/AsyncStreamTaskFactory.java      |  10 +-
 .../apache/samza/task/StreamTaskFactory.java    |   6 +-
 .../java/org/apache/samza/task/TaskFactory.java |  38 ++
 .../samza/runtime/TestApplicationRunners.java   |  88 +++
 .../application/ApplicationDescriptorImpl.java  | 179 ++++++
 .../application/ApplicationDescriptorUtil.java  |  51 ++
 .../samza/application/ApplicationUtil.java      |  63 ++
 .../application/LegacyTaskApplication.java      |  37 ++
 .../StreamApplicationDescriptorImpl.java        | 381 ++++++++++++
 .../TaskApplicationDescriptorImpl.java          | 129 ++++
 .../samza/container/SamzaContainerListener.java |  22 +-
 .../samza/execution/ExecutionPlanner.java       |   7 +-
 .../org/apache/samza/execution/JobGraph.java    |   6 -
 .../org/apache/samza/execution/JobPlanner.java  | 188 ++++++
 .../apache/samza/execution/LocalJobPlanner.java | 134 +++++
 .../samza/execution/RemoteJobPlanner.java       |  96 +++
 .../samza/operators/MessageStreamImpl.java      |  57 +-
 .../samza/operators/OperatorSpecGraph.java      |  26 +-
 .../apache/samza/operators/StreamGraphSpec.java | 336 -----------
 .../samza/operators/spec/OperatorSpec.java      |   2 +-
 .../stream/IntermediateMessageStreamImpl.java   |   6 +-
 .../apache/samza/processor/StreamProcessor.java | 122 ++--
 .../StreamProcessorLifecycleListener.java       |  49 --
 .../runtime/AbstractApplicationRunner.java      | 135 -----
 .../samza/runtime/ApplicationRunnerMain.java    |  42 +-
 .../samza/runtime/LocalApplicationRunner.java   | 355 ++++-------
 .../samza/runtime/LocalContainerRunner.java     |  56 +-
 .../samza/runtime/RemoteApplicationRunner.java  | 123 ++--
 .../apache/samza/task/StreamOperatorTask.java   |   5 +-
 .../org/apache/samza/task/TaskFactoryUtil.java  | 137 ++---
 .../apache/samza/container/SamzaContainer.scala |  16 +-
 .../scala/org/apache/samza/job/JobRunner.scala  |   2 -
 .../samza/job/local/ThreadJobFactory.scala      |  49 +-
 .../application/MockStreamApplication.java      |  29 +
 .../samza/application/TestApplicationUtil.java  |  96 +++
 .../TestStreamApplicationDescriptorImpl.java    | 584 +++++++++++++++++++
 .../TestTaskApplicationDescriptorImpl.java      | 144 +++++
 .../samza/execution/TestExecutionPlanner.java   | 192 +++---
 .../execution/TestJobGraphJsonGenerator.java    | 120 ++--
 .../org/apache/samza/execution/TestJobNode.java |  53 +-
 .../samza/execution/TestLocalJobPlanner.java    | 211 +++++++
 .../samza/execution/TestRemoteJobPlanner.java   |  88 +++
 .../samza/operators/TestJoinOperator.java       | 103 ++--
 .../samza/operators/TestMessageStreamImpl.java  |  29 +-
 .../samza/operators/TestOperatorSpecGraph.java  |  19 +-
 .../samza/operators/TestStreamGraphSpec.java    | 506 ----------------
 .../operators/impl/TestOperatorImplGraph.java   | 190 +++---
 .../operators/impl/TestWindowOperator.java      | 147 ++---
 .../spec/TestPartitionByOperatorSpec.java       |  70 ++-
 .../samza/processor/TestStreamProcessor.java    | 139 +++--
 .../runtime/TestApplicationRunnerMain.java      |  47 +-
 .../runtime/TestLocalApplicationRunner.java     | 311 +++-------
 .../runtime/TestRemoteApplicationRunner.java    |  35 +-
 .../apache/samza/task/MockAsyncStreamTask.java  |  31 +
 .../org/apache/samza/task/MockStreamTask.java   |  31 +
 .../apache/samza/task/TestTaskFactoryUtil.java  | 215 ++-----
 .../samza/testUtils/TestAsyncStreamTask.java    |  35 --
 .../samza/testUtils/TestStreamApplication.java  |  33 --
 .../apache/samza/testUtils/TestStreamTask.java  |  34 --
 .../samza/container/TestSamzaContainer.scala    |  76 ++-
 .../samza/sql/runner/SamzaSqlApplication.java   |  13 +-
 .../sql/runner/SamzaSqlApplicationRunner.java   |  53 +-
 .../samza/sql/translator/JoinTranslator.java    |   2 +-
 .../samza/sql/translator/QueryTranslator.java   |  27 +-
 .../samza/sql/translator/ScanTranslator.java    |   8 +-
 .../samza/sql/translator/TranslatorContext.java |  19 +-
 .../apache/samza/sql/e2e/TestSamzaSqlTable.java |   8 +-
 .../runner/TestSamzaSqlApplicationRunner.java   |   2 -
 .../sql/translator/TestFilterTranslator.java    |   6 +-
 .../sql/translator/TestJoinTranslator.java      |  16 +-
 .../sql/translator/TestProjectTranslator.java   |  14 +-
 .../sql/translator/TestQueryTranslator.java     | 162 +++--
 .../example/AppWithGlobalConfigExample.java     |  25 +-
 .../apache/samza/example/BroadcastExample.java  |  22 +-
 .../samza/example/KeyValueStoreExample.java     |  19 +-
 .../org/apache/samza/example/MergeExample.java  |  18 +-
 .../samza/example/OrderShipmentJoinExample.java |  19 +-
 .../samza/example/PageViewCounterExample.java   |  15 +-
 .../samza/example/RepartitionExample.java       |  19 +-
 .../samza/example/TaskApplicationExample.java   |  77 +++
 .../org/apache/samza/example/WindowExample.java |  18 +-
 .../samza/system/mock/MockSystemConsumer.java   |   4 +-
 .../apache/samza/test/framework/TestRunner.java |  41 +-
 .../integration/LocalApplicationRunnerMain.java |  21 +-
 .../TestStandaloneIntegrationApplication.java   |   9 +-
 .../processor/TestZkStreamProcessorBase.java    |  20 +-
 .../EndOfStreamIntegrationTest.java             |  37 +-
 .../WatermarkIntegrationTest.java               |  62 +-
 .../test/framework/BroadcastAssertApp.java      |   7 +-
 .../StreamApplicationIntegrationTest.java       |   9 +-
 ...StreamApplicationIntegrationTestHarness.java |  42 +-
 .../samza/test/framework/TestTimerApp.java      |   7 +-
 .../apache/samza/test/framework/TimerTest.java  |  18 +-
 .../test/operator/RepartitionJoinWindowApp.java |  25 +-
 .../test/operator/RepartitionWindowApp.java     |  20 +-
 .../samza/test/operator/SessionWindowApp.java   |  17 +-
 .../operator/TestRepartitionJoinWindowApp.java  |  30 +-
 .../test/operator/TestRepartitionWindowApp.java |  10 +-
 .../samza/test/operator/TumblingWindowApp.java  |  16 +-
 .../test/processor/TestStreamApplication.java   |  82 +--
 .../test/processor/TestStreamProcessor.java     |  18 +-
 .../processor/TestZkLocalApplicationRunner.java | 317 +++++-----
 .../apache/samza/test/table/TestLocalTable.java |  39 +-
 .../table/TestLocalTableWithSideInputs.java     |  13 +-
 .../samza/test/table/TestRemoteTable.java       |  27 +-
 .../benchmark/SystemConsumerWithSamzaBench.java |  14 +-
 124 files changed, 5280 insertions(+), 3632 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9217644e/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/samza/blob/9217644e/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --cc 
samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 0d71303,abd7f65..bec4ec0
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@@ -19,17 -19,19 +19,18 @@@
  
  package org.apache.samza.job.local
  
+ import org.apache.samza.application.{ApplicationDescriptorUtil, 
ApplicationUtil}
 -import org.apache.samza.config.{Config, TaskConfigJava}
  import org.apache.samza.config.JobConfig._
  import org.apache.samza.config.ShellCommandConfig._
 +import org.apache.samza.config.{Config, TaskConfigJava}
  import org.apache.samza.container.{SamzaContainer, SamzaContainerListener, 
TaskName}
  import org.apache.samza.coordinator.JobModelManager
  import org.apache.samza.coordinator.stream.CoordinatorStreamManager
  import org.apache.samza.job.{StreamJob, StreamJobFactory}
  import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap, 
MetricsReporter}
- import org.apache.samza.operators.StreamGraphSpec
+ import org.apache.samza.runtime.ProcessorContext
  import org.apache.samza.storage.ChangelogStreamManager
 -import org.apache.samza.task.TaskFactory
--import org.apache.samza.task.TaskFactoryUtil
++import org.apache.samza.task.{TaskFactory, TaskFactoryUtil}
  import org.apache.samza.util.Logging
  
  import scala.collection.JavaConversions._
@@@ -72,32 -72,36 +73,36 @@@ class ThreadJobFactory extends StreamJo
  
      val containerId = "0"
      val jmxServer = new JmxServer
-     val streamApp = TaskFactoryUtil.createStreamApplication(config)
- 
-     val taskFactory = if (streamApp != null) {
-       val graphSpec = new StreamGraphSpec(config)
-       streamApp.init(graphSpec, config)
-       TaskFactoryUtil.createTaskFactory(graphSpec.getOperatorSpecGraph(), 
graphSpec.getContextManager)
-     } else {
-       TaskFactoryUtil.createTaskFactory(config)
-     }
+ 
+     val appDesc = 
ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), 
config)
 -    val taskFactory : TaskFactory[_] = TaskFactoryUtil.getTaskFactory(appDesc)
++    val taskFactory: TaskFactory[_] = TaskFactoryUtil.getTaskFactory(appDesc)
  
      // Give developers a nice friendly warning if they've specified task.opts 
and are using a threaded job.
      config.getTaskOpts match {
-       case Some(taskOpts) => warn("%s was specified in config, but is not 
being used because job is being executed with ThreadJob. You probably want to 
run %s=%s." format(TASK_JVM_OPTS, STREAM_JOB_FACTORY_CLASS, 
classOf[ProcessJobFactory].getName))
+       case Some(taskOpts) => warn("%s was specified in config, but is not 
being used because job is being executed with ThreadJob. " +
 -        "You probably want to run %s=%s." format (TASK_JVM_OPTS, 
STREAM_JOB_FACTORY_CLASS, classOf[ProcessJobFactory].getName))
++        "You probably want to run %s=%s." format(TASK_JVM_OPTS, 
STREAM_JOB_FACTORY_CLASS, classOf[ProcessJobFactory].getName))
        case _ => None
      }
  
-     val containerListener = new SamzaContainerListener {
-       override def onContainerFailed(t: Throwable): Unit = {
-         error("Container failed.", t)
-         throw t
-       }
- 
-       override def onContainerStop(): Unit = {
-       }
- 
-       override def onContainerStart(): Unit = {
+     val containerListener = {
 -      val processorLifecycleListener = 
appDesc.getProcessorLifecycleListenerFactory().createInstance(new 
ProcessorContext() { }, config)
++      val processorLifecycleListener = 
appDesc.getProcessorLifecycleListenerFactory().createInstance(new 
ProcessorContext() {}, config)
+       new SamzaContainerListener {
+         override def afterFailure(t: Throwable): Unit = {
+           processorLifecycleListener.afterFailure(t)
+           throw t
+         }
+ 
+         override def afterStart(): Unit = {
+           processorLifecycleListener.afterStart()
+         }
+ 
+         override def afterStop(): Unit = {
+           processorLifecycleListener.afterStop()
+         }
+ 
+         override def beforeStart(): Unit = {
+           processorLifecycleListener.beforeStart()
+         }
  
        }
      }

Reply via email to