jstorm-runner: fix checkstyles.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aa251a4a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aa251a4a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aa251a4a Branch: refs/heads/jstorm-runner Commit: aa251a4a4d2850310f5dfd9db4d605cce41bba13 Parents: f3df3a2 Author: Pei He <[email protected]> Authored: Thu Jul 13 17:37:51 2017 +0800 Committer: Pei He <[email protected]> Committed: Sat Aug 19 12:02:47 2017 +0800 ---------------------------------------------------------------------- .../beam/runners/jstorm/JStormRunner.java | 395 +++++------ .../runners/jstorm/JStormRunnerRegistrar.java | 39 +- .../beam/runners/jstorm/JStormRunnerResult.java | 118 ++-- .../beam/runners/jstorm/TestJStormRunner.java | 188 +++--- .../serialization/ImmutableListSerializer.java | 152 +++-- .../serialization/ImmutableMapSerializer.java | 78 ++- .../serialization/ImmutableSetSerializer.java | 93 +-- .../KvStoreIterableSerializer.java | 73 +- .../SdkRepackImmuListSerializer.java | 116 ++-- .../SdkRepackImmuSetSerializer.java | 98 +-- .../UnmodifiableCollectionsSerializer.java | 290 ++++---- .../translation/StormPipelineTranslator.java | 273 ++++---- .../jstorm/translation/TranslationContext.java | 667 ++++++++++--------- .../jstorm/translation/TranslatorRegistry.java | 65 +- .../translation/runtime/AbstractComponent.java | 66 +- .../translation/runtime/AdaptorBasicBolt.java | 2 +- .../translation/runtime/AdaptorBasicSpout.java | 2 +- .../translation/runtime/DoFnExecutor.java | 511 +++++++------- .../runtime/DoFnRunnerWithMetrics.java | 3 +- .../jstorm/translation/runtime/Executor.java | 13 +- .../translation/runtime/ExecutorContext.java | 15 +- .../translation/runtime/ExecutorsBolt.java | 502 +++++++------- .../translation/runtime/FlattenExecutor.java | 61 +- .../runtime/GroupByWindowExecutor.java | 231 ++++--- .../runtime/MultiOutputDoFnExecutor.java | 79 ++- .../runtime/MultiStatefulDoFnExecutor.java | 64 +- .../runtime/StatefulDoFnExecutor.java | 63 +- .../translation/runtime/TimerService.java | 37 +- .../translation/runtime/TimerServiceImpl.java | 233 +++---- .../translation/runtime/TxExecutorsBolt.java | 193 +++--- .../runtime/TxUnboundedSourceSpout.java | 244 +++---- .../runtime/UnboundedSourceSpout.java | 288 ++++---- .../translation/runtime/ViewExecutor.java | 53 +- .../runtime/WindowAssignExecutor.java | 130 ++-- .../runtime/state/JStormBagState.java | 261 ++++---- .../runtime/state/JStormCombiningState.java | 98 +-- .../runtime/state/JStormMapState.java | 227 ++++--- .../runtime/state/JStormStateInternals.java | 290 ++++---- .../runtime/state/JStormValueState.java | 92 ++- .../runtime/state/JStormWatermarkHoldState.java | 88 +-- .../runtime/timer/JStormTimerInternals.java | 143 ++-- .../translator/BoundedSourceTranslator.java | 29 +- .../translator/CombineGloballyTranslator.java | 5 +- .../translator/CombinePerKeyTranslator.java | 5 +- .../translator/FlattenTranslator.java | 34 +- .../translator/GroupByKeyTranslator.java | 71 +- .../translator/ParDoBoundMultiTranslator.java | 143 ++-- .../translator/ParDoBoundTranslator.java | 128 ++-- .../translator/ReshuffleTranslator.java | 4 +- .../jstorm/translation/translator/Stream.java | 109 +-- .../translator/TransformTranslator.java | 74 +- .../translator/UnboundedSourceTranslator.java | 28 +- .../translation/translator/ViewTranslator.java | 586 ++++++++-------- .../translator/WindowAssignTranslator.java | 26 +- .../translator/WindowBoundTranslator.java | 26 +- .../jstorm/translation/util/CommonInstance.java | 6 +- .../util/DefaultSideInputReader.java | 33 +- .../translation/util/DefaultStepContext.java | 89 +-- .../beam/runners/jstorm/util/RunnerUtils.java | 46 +- .../jstorm/util/SerializedPipelineOptions.java | 51 +- .../jstorm/util/SingletonKeyedWorkItem.java | 3 +- .../jstorm/JStormRunnerRegistrarTest.java | 4 +- .../runtime/state/JStormStateInternalsTest.java | 345 +++++----- 63 files changed, 4314 insertions(+), 4165 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java index 39c723b..5fdbe4d 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java @@ -17,8 +17,6 @@ */ package org.apache.beam.runners.jstorm; -import static com.google.common.base.Preconditions.checkNotNull; - import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; @@ -31,8 +29,6 @@ import backtype.storm.tuple.Fields; import com.alibaba.jstorm.cache.KvStoreIterable; import com.alibaba.jstorm.cluster.StormConfig; import com.alibaba.jstorm.transactional.TransactionTopologyBuilder; -import com.alibaba.jstorm.utils.JStormUtils; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.beam.runners.jstorm.serialization.ImmutableListSerializer; @@ -54,12 +50,9 @@ import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout; import org.apache.beam.runners.jstorm.translation.translator.Stream; import org.apache.beam.runners.jstorm.translation.util.CommonInstance; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineRunner; -import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; -import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,204 +63,218 @@ import org.slf4j.LoggerFactory; public class JStormRunner extends PipelineRunner<JStormRunnerResult> { - private static final Logger LOG = LoggerFactory.getLogger(JStormRunner.class); - - private JStormPipelineOptions options; - - public JStormRunner(JStormPipelineOptions options) { - this.options = options; + private static final Logger LOG = LoggerFactory.getLogger(JStormRunner.class); + + private JStormPipelineOptions options; + + public JStormRunner(JStormPipelineOptions options) { + this.options = options; + } + + public static JStormRunner fromOptions(PipelineOptions options) { + JStormPipelineOptions pipelineOptions = PipelineOptionsValidator.validate( + JStormPipelineOptions.class, options); + return new JStormRunner(pipelineOptions); + } + + /** + * convert pipeline options to storm configuration format + * + * @param options + * @return + */ + private Config convertPipelineOptionsToConfig(JStormPipelineOptions options) { + Config config = new Config(); + if (options.getLocalMode()) + config.put(Config.STORM_CLUSTER_MODE, "local"); + else + config.put(Config.STORM_CLUSTER_MODE, "distributed"); + + Config.setNumWorkers(config, options.getWorkerNumber()); + + config.putAll(options.getTopologyConfig()); + + // Setup config for runtime env + config.put("worker.external", "beam"); + config.put("topology.acker.executors", 0); + + UnmodifiableCollectionsSerializer.registerSerializers(config); + // register classes of guava utils, ImmutableList, ImmutableSet, ImmutableMap + ImmutableListSerializer.registerSerializers(config); + SdkRepackImmuListSerializer.registerSerializers(config); + ImmutableSetSerializer.registerSerializers(config); + SdkRepackImmuSetSerializer.registerSerializers(config); + ImmutableMapSerializer.registerSerializers(config); + + config.registerDefaultSerailizer(KvStoreIterable.class, KvStoreIterableSerializer.class); + return config; + } + + @Override + public JStormRunnerResult run(Pipeline pipeline) { + LOG.info("Running pipeline..."); + TranslationContext context = new TranslationContext(this.options); + StormPipelineTranslator transformer = new StormPipelineTranslator(context); + transformer.translate(pipeline); + LOG.info("UserGraphContext=\n{}", context.getUserGraphContext()); + LOG.info("ExecutionGraphContext=\n{}", context.getExecutionGraphContext()); + + for (Stream stream : context.getExecutionGraphContext().getStreams()) { + LOG.info( + stream.getProducer().getComponentId() + " --> " + stream.getConsumer().getComponentId()); } - public static JStormRunner fromOptions(PipelineOptions options) { - JStormPipelineOptions pipelineOptions = PipelineOptionsValidator.validate(JStormPipelineOptions.class, options); - return new JStormRunner(pipelineOptions); + String topologyName = options.getJobName(); + Config config = convertPipelineOptionsToConfig(options); + + return runTopology( + topologyName, + getTopology(options, context.getExecutionGraphContext()), + config); + } + + private JStormRunnerResult runTopology( + String topologyName, + StormTopology topology, + Config config) { + try { + if (StormConfig.local_mode(config)) { + LocalCluster localCluster = LocalCluster.getInstance(); + localCluster.submitTopology(topologyName, config, topology); + return JStormRunnerResult.local( + topologyName, config, localCluster, options.getLocalModeExecuteTime()); + } else { + StormSubmitter.submitTopology(topologyName, config, topology); + return null; + } + } catch (Exception e) { + LOG.warn("Fail to submit topology", e); + throw new RuntimeException("Fail to submit topology", e); } - - /** - * convert pipeline options to storm configuration format - * @param options - * @return - */ - private Config convertPipelineOptionsToConfig(JStormPipelineOptions options) { - Config config = new Config(); - if (options.getLocalMode()) - config.put(Config.STORM_CLUSTER_MODE, "local"); - else - config.put(Config.STORM_CLUSTER_MODE, "distributed"); - - Config.setNumWorkers(config, options.getWorkerNumber()); - - config.putAll(options.getTopologyConfig()); - - // Setup config for runtime env - config.put("worker.external", "beam"); - config.put("topology.acker.executors", 0); - - UnmodifiableCollectionsSerializer.registerSerializers(config); - // register classes of guava utils, ImmutableList, ImmutableSet, ImmutableMap - ImmutableListSerializer.registerSerializers(config); - SdkRepackImmuListSerializer.registerSerializers(config); - ImmutableSetSerializer.registerSerializers(config); - SdkRepackImmuSetSerializer.registerSerializers(config); - ImmutableMapSerializer.registerSerializers(config); - - config.registerDefaultSerailizer(KvStoreIterable.class, KvStoreIterableSerializer.class); - return config; + } + + private AbstractComponent getComponent( + String id, TranslationContext.ExecutionGraphContext context) { + AbstractComponent component = null; + AdaptorBasicSpout spout = context.getSpout(id); + if (spout != null) { + component = spout; + } else { + AdaptorBasicBolt bolt = context.getBolt(id); + if (bolt != null) + component = bolt; } - @Override - public JStormRunnerResult run(Pipeline pipeline) { - LOG.info("Running pipeline..."); - TranslationContext context = new TranslationContext(this.options); - StormPipelineTranslator transformer = new StormPipelineTranslator(context); - transformer.translate(pipeline); - LOG.info("UserGraphContext=\n{}", context.getUserGraphContext()); - LOG.info("ExecutionGraphContext=\n{}", context.getExecutionGraphContext()); - - for (Stream stream : context.getExecutionGraphContext().getStreams()) { - LOG.info(stream.getProducer().getComponentId() + " --> " + stream.getConsumer().getComponentId()); - } + return component; + } - String topologyName = options.getJobName(); - Config config = convertPipelineOptionsToConfig(options); + private StormTopology getTopology( + JStormPipelineOptions options, TranslationContext.ExecutionGraphContext context) { + boolean isExactlyOnce = options.getExactlyOnceTopology(); + TopologyBuilder builder = + isExactlyOnce ? new TransactionTopologyBuilder() : new TopologyBuilder(); - return runTopology( - topologyName, - getTopology(options, context.getExecutionGraphContext()), - config); + int parallelismNumber = options.getParallelismNumber(); + Map<String, AdaptorBasicSpout> spouts = context.getSpouts(); + for (String id : spouts.keySet()) { + IRichSpout spout = getSpout(isExactlyOnce, spouts.get(id)); + builder.setSpout(id, spout, getParallelismNum(spouts.get(id), parallelismNumber)); } - private JStormRunnerResult runTopology(String topologyName, StormTopology topology, Config config) { - try { - if (StormConfig.local_mode(config)) { - LocalCluster localCluster = LocalCluster.getInstance(); - localCluster.submitTopology(topologyName, config, topology); - return JStormRunnerResult.local( - topologyName, config, localCluster, options.getLocalModeExecuteTime()); - } else { - StormSubmitter.submitTopology(topologyName, config, topology); - return null; - } - } catch (Exception e) { - LOG.warn("Fail to submit topology", e); - throw new RuntimeException("Fail to submit topology", e); - } + HashMap<String, BoltDeclarer> declarers = new HashMap<>(); + Iterable<Stream> streams = context.getStreams(); + LOG.info("streams=" + streams); + for (Stream stream : streams) { + String destBoltId = stream.getConsumer().getComponentId(); + IRichBolt bolt = getBolt(isExactlyOnce, context.getBolt(destBoltId)); + BoltDeclarer declarer = declarers.get(destBoltId); + if (declarer == null) { + declarer = builder.setBolt( + destBoltId, + bolt, + getParallelismNum(context.getBolt(destBoltId), parallelismNumber)); + declarers.put(destBoltId, declarer); + } + + Stream.Grouping grouping = stream.getConsumer().getGrouping(); + String streamId = stream.getProducer().getStreamId(); + String srcBoltId = stream.getProducer().getComponentId(); + + // add stream output declare for "from" component + AbstractComponent component = getComponent(srcBoltId, context); + if (grouping.getType().equals(Stream.Grouping.Type.FIELDS)) + component.addKVOutputField(streamId); + else + component.addOutputField(streamId); + + // "to" component declares grouping to "from" component + switch (grouping.getType()) { + case SHUFFLE: + declarer.shuffleGrouping(srcBoltId, streamId); + break; + case FIELDS: + declarer.fieldsGrouping(srcBoltId, streamId, new Fields(grouping.getFields())); + break; + case ALL: + declarer.allGrouping(srcBoltId, streamId); + break; + case DIRECT: + declarer.directGrouping(srcBoltId, streamId); + break; + case GLOBAL: + declarer.globalGrouping(srcBoltId, streamId); + break; + case LOCAL_OR_SHUFFLE: + declarer.localOrShuffleGrouping(srcBoltId, streamId); + break; + case NONE: + declarer.noneGrouping(srcBoltId, streamId); + break; + default: + throw new UnsupportedOperationException("unsupported grouping type: " + grouping); + } + + // Subscribe grouping of water mark stream + component.addOutputField(CommonInstance.BEAM_WATERMARK_STREAM_ID); + declarer.allGrouping(srcBoltId, CommonInstance.BEAM_WATERMARK_STREAM_ID); } - private AbstractComponent getComponent(String id, TranslationContext.ExecutionGraphContext context) { - AbstractComponent component = null; - AdaptorBasicSpout spout = context.getSpout(id); - if (spout != null) { - component = spout; - } else { - AdaptorBasicBolt bolt = context.getBolt(id); - if (bolt != null) - component = bolt; - } - - return component; + if (isExactlyOnce) { + ((TransactionTopologyBuilder) builder).enableHdfs(); } - - private StormTopology getTopology(JStormPipelineOptions options, TranslationContext.ExecutionGraphContext context) { - boolean isExactlyOnce = options.getExactlyOnceTopology(); - TopologyBuilder builder = isExactlyOnce ? new TransactionTopologyBuilder() : new TopologyBuilder(); - - int parallelismNumber = options.getParallelismNumber(); - Map<String, AdaptorBasicSpout> spouts = context.getSpouts(); - for (String id : spouts.keySet()) { - IRichSpout spout = getSpout(isExactlyOnce, spouts.get(id)); - builder.setSpout(id, spout, getParallelismNum(spouts.get(id), parallelismNumber)); - } - - HashMap<String, BoltDeclarer> declarers = new HashMap<>(); - Iterable<Stream> streams = context.getStreams(); - LOG.info("streams=" + streams); - for (Stream stream : streams) { - String destBoltId = stream.getConsumer().getComponentId(); - IRichBolt bolt = getBolt(isExactlyOnce, context.getBolt(destBoltId)); - BoltDeclarer declarer = declarers.get(destBoltId); - if (declarer == null) { - declarer = builder.setBolt(destBoltId, bolt, - getParallelismNum(context.getBolt(destBoltId), parallelismNumber)); - declarers.put(destBoltId, declarer); - } - - Stream.Grouping grouping = stream.getConsumer().getGrouping(); - String streamId = stream.getProducer().getStreamId(); - String srcBoltId = stream.getProducer().getComponentId(); - - // add stream output declare for "from" component - AbstractComponent component = getComponent(srcBoltId, context); - if (grouping.getType().equals(Stream.Grouping.Type.FIELDS)) - component.addKVOutputField(streamId); - else - component.addOutputField(streamId); - - // "to" component declares grouping to "from" component - switch (grouping.getType()) { - case SHUFFLE: - declarer.shuffleGrouping(srcBoltId, streamId); - break; - case FIELDS: - declarer.fieldsGrouping(srcBoltId, streamId, new Fields(grouping.getFields())); - break; - case ALL: - declarer.allGrouping(srcBoltId, streamId); - break; - case DIRECT: - declarer.directGrouping(srcBoltId, streamId); - break; - case GLOBAL: - declarer.globalGrouping(srcBoltId, streamId); - break; - case LOCAL_OR_SHUFFLE: - declarer.localOrShuffleGrouping(srcBoltId, streamId); - break; - case NONE: - declarer.noneGrouping(srcBoltId, streamId); - break; - default: - throw new UnsupportedOperationException("unsupported grouping type: " + grouping); - } - - // Subscribe grouping of water mark stream - component.addOutputField(CommonInstance.BEAM_WATERMARK_STREAM_ID); - declarer.allGrouping(srcBoltId, CommonInstance.BEAM_WATERMARK_STREAM_ID); - } - - if (isExactlyOnce) { - ((TransactionTopologyBuilder) builder).enableHdfs(); - } - return builder.createTopology(); - } - - private IRichSpout getSpout(boolean isExactlyOnce, IRichSpout spout) { - IRichSpout ret = null; - if (isExactlyOnce) { - if (spout instanceof UnboundedSourceSpout) { - ret = new TxUnboundedSourceSpout((UnboundedSourceSpout) spout); - } else { - String error = String.format("The specified type(%s) is not supported in exactly once mode yet!", spout.getClass().toString()); - throw new RuntimeException(error); - } - } else { - ret = spout; - } - return ret; - } - - private IRichBolt getBolt(boolean isExactlyOnce, ExecutorsBolt bolt) { - return isExactlyOnce ? new TxExecutorsBolt(bolt) : bolt; - } - - /** - * Calculate the final parallelism number according to the configured number and global number. - * @param component - * @param globalParallelismNum - * @return final parallelism number for the specified component - */ - private int getParallelismNum(AbstractComponent component, int globalParallelismNum) { - int configParallelismNum = component.getParallelismNum(); - return configParallelismNum > 0 ? configParallelismNum : globalParallelismNum; + return builder.createTopology(); + } + + private IRichSpout getSpout(boolean isExactlyOnce, IRichSpout spout) { + IRichSpout ret = null; + if (isExactlyOnce) { + if (spout instanceof UnboundedSourceSpout) { + ret = new TxUnboundedSourceSpout((UnboundedSourceSpout) spout); + } else { + String error = String.format( + "The specified type(%s) is not supported in exactly once mode yet!", + spout.getClass().toString()); + throw new RuntimeException(error); + } + } else { + ret = spout; } + return ret; + } + + private IRichBolt getBolt(boolean isExactlyOnce, ExecutorsBolt bolt) { + return isExactlyOnce ? new TxExecutorsBolt(bolt) : bolt; + } + + /** + * Calculate the final parallelism number according to the configured number and global number. + * + * @param component + * @param globalParallelismNum + * @return final parallelism number for the specified component + */ + private int getParallelismNum(AbstractComponent component, int globalParallelismNum) { + int configParallelismNum = component.getParallelismNum(); + return configParallelismNum > 0 ? configParallelismNum : globalParallelismNum; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrar.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrar.java index 465236b..1b4d283 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrar.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrar.java @@ -29,27 +29,28 @@ import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; * {@link JStormRunner}. */ public class JStormRunnerRegistrar { - private JStormRunnerRegistrar() {} + private JStormRunnerRegistrar() { + } - /** - * Register the {@link JStormPipelineOptions}. - */ - @AutoService(PipelineOptionsRegistrar.class) - public static class Options implements PipelineOptionsRegistrar { - @Override - public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { - return ImmutableList.<Class<? extends PipelineOptions>> of(JStormPipelineOptions.class); - } + /** + * Register the {@link JStormPipelineOptions}. + */ + @AutoService(PipelineOptionsRegistrar.class) + public static class Options implements PipelineOptionsRegistrar { + @Override + public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() { + return ImmutableList.<Class<? extends PipelineOptions>>of(JStormPipelineOptions.class); } + } - /** - * Register the {@link JStormRunner}. - */ - @AutoService(PipelineRunnerRegistrar.class) - public static class Runner implements PipelineRunnerRegistrar { - @Override - public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() { - return ImmutableList.<Class<? extends PipelineRunner<?>>> of(JStormRunner.class); - } + /** + * Register the {@link JStormRunner}. + */ + @AutoService(PipelineRunnerRegistrar.class) + public static class Runner implements PipelineRunnerRegistrar { + @Override + public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() { + return ImmutableList.<Class<? extends PipelineRunner<?>>>of(JStormRunner.class); } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java index e15ee6d..797c899 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java @@ -33,76 +33,76 @@ import org.joda.time.Duration; */ public abstract class JStormRunnerResult implements PipelineResult { - public static JStormRunnerResult local( + public static JStormRunnerResult local( + String topologyName, + Config config, + LocalCluster localCluster, + long localModeExecuteTimeSecs) { + return new LocalStormPipelineResult( + topologyName, config, localCluster, localModeExecuteTimeSecs); + } + + private final String topologyName; + private final Config config; + + JStormRunnerResult(String topologyName, Config config) { + this.config = checkNotNull(config, "config"); + this.topologyName = checkNotNull(topologyName, "topologyName"); + } + + public State getState() { + return null; + } + + public Config getConfig() { + return config; + } + + public String getTopologyName() { + return topologyName; + } + + private static class LocalStormPipelineResult extends JStormRunnerResult { + + private LocalCluster localCluster; + private long localModeExecuteTimeSecs; + + LocalStormPipelineResult( String topologyName, Config config, LocalCluster localCluster, long localModeExecuteTimeSecs) { - return new LocalStormPipelineResult( - topologyName, config, localCluster, localModeExecuteTimeSecs); - } - - private final String topologyName; - private final Config config; - - JStormRunnerResult(String topologyName, Config config) { - this.config = checkNotNull(config, "config"); - this.topologyName = checkNotNull(topologyName, "topologyName"); + super(topologyName, config); + this.localCluster = checkNotNull(localCluster, "localCluster"); } - public State getState() { - return null; + @Override + public State cancel() throws IOException { + //localCluster.deactivate(getTopologyName()); + localCluster.killTopology(getTopologyName()); + localCluster.shutdown(); + JStormUtils.sleepMs(1000); + return State.CANCELLED; } - public Config getConfig() { - return config; + @Override + public State waitUntilFinish(Duration duration) { + return waitUntilFinish(); } - public String getTopologyName() { - return topologyName; + @Override + public State waitUntilFinish() { + JStormUtils.sleepMs(localModeExecuteTimeSecs * 1000); + try { + return cancel(); + } catch (IOException e) { + throw new RuntimeException(e); + } } - private static class LocalStormPipelineResult extends JStormRunnerResult { - - private LocalCluster localCluster; - private long localModeExecuteTimeSecs; - - LocalStormPipelineResult( - String topologyName, - Config config, - LocalCluster localCluster, - long localModeExecuteTimeSecs) { - super(topologyName, config); - this.localCluster = checkNotNull(localCluster, "localCluster"); - } - - @Override - public State cancel() throws IOException { - //localCluster.deactivate(getTopologyName()); - localCluster.killTopology(getTopologyName()); - localCluster.shutdown(); - JStormUtils.sleepMs(1000); - return State.CANCELLED; - } - - @Override - public State waitUntilFinish(Duration duration) { - return waitUntilFinish(); - } - - @Override - public State waitUntilFinish() { - JStormUtils.sleepMs(localModeExecuteTimeSecs * 1000); - try { - return cancel(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public MetricResults metrics() { - return null; - } + @Override + public MetricResults metrics() { + return null; } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java index b7ff4eb..e27efc0 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java @@ -1,10 +1,19 @@ package org.apache.beam.runners.jstorm; +import static com.google.common.base.Preconditions.checkNotNull; + import avro.shaded.com.google.common.collect.Maps; import com.alibaba.jstorm.common.metric.AsmMetric; -import com.alibaba.jstorm.metric.*; +import com.alibaba.jstorm.metric.AsmMetricRegistry; +import com.alibaba.jstorm.metric.AsmWindow; +import com.alibaba.jstorm.metric.JStormMetrics; +import com.alibaba.jstorm.metric.MetaType; +import com.alibaba.jstorm.metric.MetricType; import com.alibaba.jstorm.utils.JStormUtils; import com.google.common.base.Optional; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.options.PipelineOptions; @@ -12,109 +21,106 @@ import org.apache.beam.sdk.testing.PAssert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Iterator; -import java.util.Map; - -import static com.google.common.base.Preconditions.checkNotNull; - /** * Test JStorm runner. */ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> { - private static final Logger LOG = LoggerFactory.getLogger(TestJStormRunner.class); - - public static TestJStormRunner fromOptions(PipelineOptions options) { - return new TestJStormRunner(options.as(JStormPipelineOptions.class)); + private static final Logger LOG = LoggerFactory.getLogger(TestJStormRunner.class); + + public static TestJStormRunner fromOptions(PipelineOptions options) { + return new TestJStormRunner(options.as(JStormPipelineOptions.class)); + } + + private final JStormRunner stormRunner; + private final JStormPipelineOptions options; + + private TestJStormRunner(JStormPipelineOptions options) { + this.options = options; + Map conf = Maps.newHashMap(); + //conf.put(ConfigExtension.KV_STORE_TYPE, KvStoreManagerFactory.KvStoreType.memory.toString()); + options.setTopologyConfig(conf); + options.setLocalMode(true); + stormRunner = JStormRunner.fromOptions(checkNotNull(options, "options")); + } + + @Override + public JStormRunnerResult run(Pipeline pipeline) { + JStormRunnerResult result = stormRunner.run(pipeline); + + try { + int numberOfAssertions = PAssert.countAsserts(pipeline); + + LOG.info("Running JStorm job {} with {} expected assertions.", + result.getTopologyName(), numberOfAssertions); + if (numberOfAssertions == 0) { + // If assert number is zero, wait 5 sec + JStormUtils.sleepMs(5000); + return result; + } else { + for (int i = 0; i < 40; ++i) { + Optional<Boolean> success = checkForPAssertSuccess(numberOfAssertions); + if (success.isPresent() && success.get()) { + return result; + } else if (success.isPresent() && !success.get()) { + throw new AssertionError("Failed assertion checks."); + } else { + JStormUtils.sleepMs(500); + } + } + LOG.info("Assertion checks timed out."); + throw new AssertionError("Assertion checks timed out."); + } + } finally { + clearPAssertCount(); + cancel(result); } + } - private final JStormRunner stormRunner; - private final JStormPipelineOptions options; - - private TestJStormRunner(JStormPipelineOptions options) { - this.options = options; - Map conf = Maps.newHashMap(); - //conf.put(ConfigExtension.KV_STORE_TYPE, KvStoreManagerFactory.KvStoreType.memory.toString()); - options.setTopologyConfig(conf); - options.setLocalMode(true); - stormRunner = JStormRunner.fromOptions(checkNotNull(options, "options")); + private Optional<Boolean> checkForPAssertSuccess(int expectedNumberOfAssertions) { + int successes = 0; + for (AsmMetric metric : + JStormMetrics.search(PAssert.SUCCESS_COUNTER, MetaType.TASK, MetricType.COUNTER)) { + successes += ((Long) metric.getValue(AsmWindow.M1_WINDOW)).intValue(); } - - @Override - public JStormRunnerResult run(Pipeline pipeline) { - JStormRunnerResult result = stormRunner.run(pipeline); - - try { - int numberOfAssertions = PAssert.countAsserts(pipeline); - - LOG.info("Running JStorm job {} with {} expected assertions.", result.getTopologyName(), numberOfAssertions); - if(numberOfAssertions == 0) { - // If assert number is zero, wait 5 sec - JStormUtils.sleepMs(5000); - return result; - } else { - for (int i = 0; i < 40; ++i) { - Optional<Boolean> success = checkForPAssertSuccess(numberOfAssertions); - if (success.isPresent() && success.get()) { - return result; - } else if (success.isPresent() && !success.get()) { - throw new AssertionError("Failed assertion checks."); - } else { - JStormUtils.sleepMs(500); - } - } - LOG.info("Assertion checks timed out."); - throw new AssertionError("Assertion checks timed out."); - } - } finally { - clearPAssertCount(); - cancel(result); - } + int failures = 0; + for (AsmMetric metric : + JStormMetrics.search(PAssert.FAILURE_COUNTER, MetaType.TASK, MetricType.COUNTER)) { + failures += ((Long) metric.getValue(AsmWindow.M1_WINDOW)).intValue(); } - private Optional<Boolean> checkForPAssertSuccess(int expectedNumberOfAssertions) { - int successes = 0; - for (AsmMetric metric : JStormMetrics.search(PAssert.SUCCESS_COUNTER, MetaType.TASK, MetricType.COUNTER)) { - successes += ((Long) metric.getValue(AsmWindow.M1_WINDOW)).intValue(); - } - int failures = 0; - for (AsmMetric metric : JStormMetrics.search(PAssert.FAILURE_COUNTER, MetaType.TASK, MetricType.COUNTER)) { - failures += ((Long) metric.getValue(AsmWindow.M1_WINDOW)).intValue(); - } - - if (failures > 0) { - LOG.info("Found {} success, {} failures out of {} expected assertions.", - successes, failures, expectedNumberOfAssertions); - return Optional.of(false); - } else if (successes >= expectedNumberOfAssertions) { - LOG.info("Found {} success, {} failures out of {} expected assertions.", - successes, failures, expectedNumberOfAssertions); - return Optional.of(true); - } - - LOG.info("Found {} success, {} failures out of {} expected assertions.", - successes, failures, expectedNumberOfAssertions); - return Optional.absent(); + if (failures > 0) { + LOG.info("Found {} success, {} failures out of {} expected assertions.", + successes, failures, expectedNumberOfAssertions); + return Optional.of(false); + } else if (successes >= expectedNumberOfAssertions) { + LOG.info("Found {} success, {} failures out of {} expected assertions.", + successes, failures, expectedNumberOfAssertions); + return Optional.of(true); } - private void clearPAssertCount() { - String topologyName = options.getJobName(); - AsmMetricRegistry taskMetrics = JStormMetrics.getTaskMetrics(); - Iterator<Map.Entry<String, AsmMetric>> itr = taskMetrics.getMetrics().entrySet().iterator(); - while (itr.hasNext()) { - Map.Entry<String, AsmMetric> metric = itr.next(); - if (metric.getKey().contains(topologyName)) { - itr.remove(); - } - } + LOG.info("Found {} success, {} failures out of {} expected assertions.", + successes, failures, expectedNumberOfAssertions); + return Optional.absent(); + } + + private void clearPAssertCount() { + String topologyName = options.getJobName(); + AsmMetricRegistry taskMetrics = JStormMetrics.getTaskMetrics(); + Iterator<Map.Entry<String, AsmMetric>> itr = taskMetrics.getMetrics().entrySet().iterator(); + while (itr.hasNext()) { + Map.Entry<String, AsmMetric> metric = itr.next(); + if (metric.getKey().contains(topologyName)) { + itr.remove(); + } } + } - private void cancel(JStormRunnerResult result) { - try { - result.cancel(); - } catch (IOException e) { - throw new RuntimeException("Failed to cancel.", e); -} + private void cancel(JStormRunnerResult result) { + try { + result.cancel(); + } catch (IOException e) { + throw new RuntimeException("Failed to cancel.", e); } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java index aa7d325..fa4eeb6 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java @@ -1,92 +1,108 @@ package org.apache.beam.runners.jstorm.serialization; import backtype.storm.Config; -import org.apache.beam.runners.jstorm.util.RunnerUtils; import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; -import com.google.common.collect.*; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableTable; +import com.google.common.collect.Lists; +import com.google.common.collect.Table; +import org.apache.beam.runners.jstorm.util.RunnerUtils; public class ImmutableListSerializer extends Serializer<ImmutableList<Object>> { - private static final boolean DOES_NOT_ACCEPT_NULL = false; - private static final boolean IMMUTABLE = true; + private static final boolean DOES_NOT_ACCEPT_NULL = false; + private static final boolean IMMUTABLE = true; - public ImmutableListSerializer() { - super(DOES_NOT_ACCEPT_NULL, IMMUTABLE); - } + public ImmutableListSerializer() { + super(DOES_NOT_ACCEPT_NULL, IMMUTABLE); + } - @Override - public void write(Kryo kryo, Output output, ImmutableList<Object> object) { - output.writeInt(object.size(), true); - for (Object elm : object) { - kryo.writeClassAndObject(output, elm); - } + @Override + public void write(Kryo kryo, Output output, ImmutableList<Object> object) { + output.writeInt(object.size(), true); + for (Object elm : object) { + kryo.writeClassAndObject(output, elm); } + } - @Override - public ImmutableList<Object> read(Kryo kryo, Input input, Class<ImmutableList<Object>> type) { - final int size = input.readInt(true); - final Object[] list = new Object[size]; - for (int i = 0; i < size; ++i) { - list[i] = kryo.readClassAndObject(input); - } - return ImmutableList.copyOf(list); + @Override + public ImmutableList<Object> read(Kryo kryo, Input input, Class<ImmutableList<Object>> type) { + final int size = input.readInt(true); + final Object[] list = new Object[size]; + for (int i = 0; i < size; ++i) { + list[i] = kryo.readClassAndObject(input); } + return ImmutableList.copyOf(list); + } - /** - * Creates a new {@link ImmutableListSerializer} and registers its serializer - * for the several ImmutableList related classes. - */ - public static void registerSerializers(Config config) { + /** + * Creates a new {@link ImmutableListSerializer} and registers its serializer + * for the several ImmutableList related classes. + */ + public static void registerSerializers(Config config) { - // ImmutableList (abstract class) - // +- RegularImmutableList - // | RegularImmutableList - // +- SingletonImmutableList - // | Optimized for List with only 1 element. - // +- SubList - // | Representation for part of ImmutableList - // +- ReverseImmutableList - // | For iterating in reverse order - // +- StringAsImmutableList - // | Used by Lists#charactersOf - // +- Values (ImmutableTable values) - // Used by return value of #values() when there are multiple cells + // ImmutableList (abstract class) + // +- RegularImmutableList + // | RegularImmutableList + // +- SingletonImmutableList + // | Optimized for List with only 1 element. + // +- SubList + // | Representation for part of ImmutableList + // +- ReverseImmutableList + // | For iterating in reverse order + // +- StringAsImmutableList + // | Used by Lists#charactersOf + // +- Values (ImmutableTable values) + // Used by return value of #values() when there are multiple cells - config.registerSerialization(ImmutableList.class, ImmutableListSerializer.class); - config.registerSerialization( - RunnerUtils.getBeamSdkRepackClass(ImmutableList.class), ImmutableListSerializer.class); + config.registerSerialization(ImmutableList.class, ImmutableListSerializer.class); + config.registerSerialization( + RunnerUtils.getBeamSdkRepackClass(ImmutableList.class), ImmutableListSerializer.class); - // Note: - // Only registering above is good enough for serializing/deserializing. - // but if using Kryo#copy, following is required. + // Note: + // Only registering above is good enough for serializing/deserializing. + // but if using Kryo#copy, following is required. - config.registerSerialization(ImmutableList.of().getClass(), ImmutableListSerializer.class); - config.registerSerialization( - RunnerUtils.getBeamSdkRepackClass(ImmutableList.of().getClass()), ImmutableListSerializer.class); - config.registerSerialization(ImmutableList.of(1).getClass(), ImmutableListSerializer.class); - config.registerSerialization( - RunnerUtils.getBeamSdkRepackClass(ImmutableList.of(1).getClass()), ImmutableListSerializer.class); - config.registerSerialization(ImmutableList.of(1,2,3).subList(1, 2).getClass(), ImmutableListSerializer.class); - config.registerSerialization( - RunnerUtils.getBeamSdkRepackClass(ImmutableList.of(1,2,3).subList(1, 2).getClass()), ImmutableListSerializer.class); - config.registerSerialization(ImmutableList.of().reverse().getClass(), ImmutableListSerializer.class); - config.registerSerialization( - RunnerUtils.getBeamSdkRepackClass(ImmutableList.of().reverse().getClass()), ImmutableListSerializer.class); + config.registerSerialization(ImmutableList.of().getClass(), ImmutableListSerializer.class); + config.registerSerialization( + RunnerUtils.getBeamSdkRepackClass(ImmutableList.of().getClass()), + ImmutableListSerializer.class); + config.registerSerialization(ImmutableList.of(1).getClass(), ImmutableListSerializer.class); + config.registerSerialization( + RunnerUtils.getBeamSdkRepackClass(ImmutableList.of(1).getClass()), + ImmutableListSerializer.class); + config.registerSerialization( + ImmutableList.of(1, 2, 3).subList(1, 2).getClass(), + ImmutableListSerializer.class); + config.registerSerialization( + RunnerUtils.getBeamSdkRepackClass(ImmutableList.of(1, 2, 3).subList(1, 2).getClass()), + ImmutableListSerializer.class); + config.registerSerialization( + ImmutableList.of().reverse().getClass(), + ImmutableListSerializer.class); + config.registerSerialization( + RunnerUtils.getBeamSdkRepackClass(ImmutableList.of().reverse().getClass()), + ImmutableListSerializer.class); - config.registerSerialization(Lists.charactersOf("KryoRocks").getClass(), ImmutableListSerializer.class); - config.registerSerialization( - RunnerUtils.getBeamSdkRepackClass(Lists.charactersOf("KryoRocks").getClass()), ImmutableListSerializer.class); + config.registerSerialization( + Lists.charactersOf("KryoRocks").getClass(), + ImmutableListSerializer.class); + config.registerSerialization( + RunnerUtils.getBeamSdkRepackClass(Lists.charactersOf("KryoRocks").getClass()), + ImmutableListSerializer.class); - Table<Integer,Integer,Integer> baseTable = HashBasedTable.create(); - baseTable.put(1, 2, 3); - baseTable.put(4, 5, 6); - Table<Integer, Integer, Integer> table = ImmutableTable.copyOf(baseTable); - config.registerSerialization(table.values().getClass(), ImmutableListSerializer.class); - config.registerSerialization( - RunnerUtils.getBeamSdkRepackClass(table.values().getClass()), ImmutableListSerializer.class); + Table<Integer, Integer, Integer> baseTable = HashBasedTable.create(); + baseTable.put(1, 2, 3); + baseTable.put(4, 5, 6); + Table<Integer, Integer, Integer> table = ImmutableTable.copyOf(baseTable); + config.registerSerialization(table.values().getClass(), ImmutableListSerializer.class); + config.registerSerialization( + RunnerUtils.getBeamSdkRepackClass(table.values().getClass()), + ImmutableListSerializer.class); - } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java index ee8b765..77eede3 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java @@ -7,55 +7,61 @@ import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; - import java.util.EnumMap; import java.util.HashMap; import java.util.Map; public class ImmutableMapSerializer extends Serializer<ImmutableMap<Object, ? extends Object>> { - private static final boolean DOES_NOT_ACCEPT_NULL = true; - private static final boolean IMMUTABLE = true; + private static final boolean DOES_NOT_ACCEPT_NULL = true; + private static final boolean IMMUTABLE = true; - public ImmutableMapSerializer() { - super(DOES_NOT_ACCEPT_NULL, IMMUTABLE); - } + public ImmutableMapSerializer() { + super(DOES_NOT_ACCEPT_NULL, IMMUTABLE); + } - @Override - public void write(Kryo kryo, Output output, ImmutableMap<Object, ? extends Object> immutableMap) { - kryo.writeObject(output, Maps.newHashMap(immutableMap)); - } + @Override + public void write(Kryo kryo, Output output, ImmutableMap<Object, ? extends Object> immutableMap) { + kryo.writeObject(output, Maps.newHashMap(immutableMap)); + } - @Override - public ImmutableMap<Object, Object> read(Kryo kryo, Input input, Class<ImmutableMap<Object, ? extends Object>> type) { - Map map = kryo.readObject(input, HashMap.class); - return ImmutableMap.copyOf(map); - } + @Override + public ImmutableMap<Object, Object> read( + Kryo kryo, + Input input, + Class<ImmutableMap<Object, ? extends Object>> type) { + Map map = kryo.readObject(input, HashMap.class); + return ImmutableMap.copyOf(map); + } - /** - * Creates a new {@link ImmutableMapSerializer} and registers its serializer - * for the several ImmutableMap related classes. - */ - public static void registerSerializers(Config config) { + /** + * Creates a new {@link ImmutableMapSerializer} and registers its serializer + * for the several ImmutableMap related classes. + */ + public static void registerSerializers(Config config) { - config.registerSerialization(ImmutableMap.class, ImmutableMapSerializer.class); - config.registerSerialization(ImmutableMap.of().getClass(), ImmutableMapSerializer.class); + config.registerSerialization(ImmutableMap.class, ImmutableMapSerializer.class); + config.registerSerialization(ImmutableMap.of().getClass(), ImmutableMapSerializer.class); - Object o1 = new Object(); - Object o2 = new Object(); + Object o1 = new Object(); + Object o2 = new Object(); - config.registerSerialization(ImmutableMap.of(o1, o1).getClass(), ImmutableMapSerializer.class); - config.registerSerialization(ImmutableMap.of(o1, o1, o2, o2).getClass(), ImmutableMapSerializer.class); - Map<DummyEnum,Object> enumMap = new EnumMap<DummyEnum, Object>(DummyEnum.class); - for (DummyEnum e : DummyEnum.values()) { - enumMap.put(e, o1); - } - - config.registerSerialization(ImmutableMap.copyOf(enumMap).getClass(), ImmutableMapSerializer.class); + config.registerSerialization(ImmutableMap.of(o1, o1).getClass(), ImmutableMapSerializer.class); + config.registerSerialization( + ImmutableMap.of(o1, o1, o2, o2).getClass(), + ImmutableMapSerializer.class); + Map<DummyEnum, Object> enumMap = new EnumMap<DummyEnum, Object>(DummyEnum.class); + for (DummyEnum e : DummyEnum.values()) { + enumMap.put(e, o1); } - private enum DummyEnum { - VALUE1, - VALUE2 - } + config.registerSerialization( + ImmutableMap.copyOf(enumMap).getClass(), + ImmutableMapSerializer.class); + } + + private enum DummyEnum { + VALUE1, + VALUE2 + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java index cdc4382..3a43b2b 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java @@ -10,62 +10,63 @@ import com.google.common.collect.Sets; public class ImmutableSetSerializer extends Serializer<ImmutableSet<Object>> { - private static final boolean DOES_NOT_ACCEPT_NULL = false; - private static final boolean IMMUTABLE = true; + private static final boolean DOES_NOT_ACCEPT_NULL = false; + private static final boolean IMMUTABLE = true; - public ImmutableSetSerializer() { - super(DOES_NOT_ACCEPT_NULL, IMMUTABLE); - } + public ImmutableSetSerializer() { + super(DOES_NOT_ACCEPT_NULL, IMMUTABLE); + } - @Override - public void write(Kryo kryo, Output output, ImmutableSet<Object> object) { - output.writeInt(object.size(), true); - for (Object elm : object) { - kryo.writeClassAndObject(output, elm); - } + @Override + public void write(Kryo kryo, Output output, ImmutableSet<Object> object) { + output.writeInt(object.size(), true); + for (Object elm : object) { + kryo.writeClassAndObject(output, elm); } + } - @Override - public ImmutableSet<Object> read(Kryo kryo, Input input, Class<ImmutableSet<Object>> type) { - final int size = input.readInt(true); - ImmutableSet.Builder<Object> builder = ImmutableSet.builder(); - for (int i = 0; i < size; ++i) { - builder.add(kryo.readClassAndObject(input)); - } - return builder.build(); + @Override + public ImmutableSet<Object> read(Kryo kryo, Input input, Class<ImmutableSet<Object>> type) { + final int size = input.readInt(true); + ImmutableSet.Builder<Object> builder = ImmutableSet.builder(); + for (int i = 0; i < size; ++i) { + builder.add(kryo.readClassAndObject(input)); } + return builder.build(); + } - /** - * Creates a new {@link ImmutableSetSerializer} and registers its serializer - * for the several ImmutableSet related classes. - */ - public static void registerSerializers(Config config) { + /** + * Creates a new {@link ImmutableSetSerializer} and registers its serializer + * for the several ImmutableSet related classes. + */ + public static void registerSerializers(Config config) { - // ImmutableList (abstract class) - // +- EmptyImmutableSet - // | EmptyImmutableSet - // +- SingletonImmutableSet - // | Optimized for Set with only 1 element. - // +- RegularImmutableSet - // | RegularImmutableList - // +- EnumImmutableSet - // | EnumImmutableSet + // ImmutableList (abstract class) + // +- EmptyImmutableSet + // | EmptyImmutableSet + // +- SingletonImmutableSet + // | Optimized for Set with only 1 element. + // +- RegularImmutableSet + // | RegularImmutableList + // +- EnumImmutableSet + // | EnumImmutableSet - config.registerSerialization(ImmutableSet.class, ImmutableSetSerializer.class); + config.registerSerialization(ImmutableSet.class, ImmutableSetSerializer.class); - // Note: - // Only registering above is good enough for serializing/deserializing. - // but if using Kryo#copy, following is required. + // Note: + // Only registering above is good enough for serializing/deserializing. + // but if using Kryo#copy, following is required. - config.registerSerialization(ImmutableSet.of().getClass(), ImmutableSetSerializer.class); - config.registerSerialization(ImmutableSet.of(1).getClass(), ImmutableSetSerializer.class); - config.registerSerialization(ImmutableSet.of(1,2,3).getClass(), ImmutableSetSerializer.class); + config.registerSerialization(ImmutableSet.of().getClass(), ImmutableSetSerializer.class); + config.registerSerialization(ImmutableSet.of(1).getClass(), ImmutableSetSerializer.class); + config.registerSerialization(ImmutableSet.of(1, 2, 3).getClass(), ImmutableSetSerializer.class); - config.registerSerialization( - Sets.immutableEnumSet(SomeEnum.A, SomeEnum.B, SomeEnum.C).getClass(), ImmutableSetSerializer.class); - } + config.registerSerialization( + Sets.immutableEnumSet(SomeEnum.A, SomeEnum.B, SomeEnum.C).getClass(), + ImmutableSetSerializer.class); + } - private enum SomeEnum { - A, B, C - } + private enum SomeEnum { + A, B, C + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java index decfb3f..b47f3b7 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java @@ -6,50 +6,49 @@ import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; import com.google.common.collect.Lists; - import java.util.Iterator; import java.util.List; public class KvStoreIterableSerializer extends Serializer<KvStoreIterable<Object>> { - public KvStoreIterableSerializer() { + public KvStoreIterableSerializer() { - } + } - @Override - public void write(Kryo kryo, Output output, KvStoreIterable<Object> object) { - List<Object> values = Lists.newArrayList(object); - output.writeInt(values.size(), true); - for (Object elm : object) { - kryo.writeClassAndObject(output, elm); - } + @Override + public void write(Kryo kryo, Output output, KvStoreIterable<Object> object) { + List<Object> values = Lists.newArrayList(object); + output.writeInt(values.size(), true); + for (Object elm : object) { + kryo.writeClassAndObject(output, elm); } - - @Override - public KvStoreIterable<Object> read(Kryo kryo, Input input, Class<KvStoreIterable<Object>> type) { - final int size = input.readInt(true); - List<Object> values = Lists.newArrayList(); - for (int i = 0; i < size; ++i) { - values.add(kryo.readClassAndObject(input)); - } - - return new KvStoreIterable<Object>() { - Iterable<Object> values; - - @Override - public Iterator<Object> iterator() { - return values.iterator(); - } - - public KvStoreIterable init(Iterable<Object> values) { - this.values = values; - return this; - } - - @Override - public String toString() { - return values.toString(); - } - }.init(values); + } + + @Override + public KvStoreIterable<Object> read(Kryo kryo, Input input, Class<KvStoreIterable<Object>> type) { + final int size = input.readInt(true); + List<Object> values = Lists.newArrayList(); + for (int i = 0; i < size; ++i) { + values.add(kryo.readClassAndObject(input)); } + + return new KvStoreIterable<Object>() { + Iterable<Object> values; + + @Override + public Iterator<Object> iterator() { + return values.iterator(); + } + + public KvStoreIterable init(Iterable<Object> values) { + this.values = values; + return this; + } + + @Override + public String toString() { + return values.toString(); + } + }.init(values); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java index 9bb315b..dd4272c 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java @@ -6,73 +6,83 @@ import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; -import org.apache.beam.sdk.repackaged.com.google.common.collect.*; +import org.apache.beam.sdk.repackaged.com.google.common.collect.HashBasedTable; +import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableTable; +import org.apache.beam.sdk.repackaged.com.google.common.collect.Lists; +import org.apache.beam.sdk.repackaged.com.google.common.collect.Table; public class SdkRepackImmuListSerializer extends Serializer<ImmutableList<Object>> { - private static final boolean DOES_NOT_ACCEPT_NULL = false; - private static final boolean IMMUTABLE = true; + private static final boolean DOES_NOT_ACCEPT_NULL = false; + private static final boolean IMMUTABLE = true; - public SdkRepackImmuListSerializer() { - super(DOES_NOT_ACCEPT_NULL, IMMUTABLE); - } + public SdkRepackImmuListSerializer() { + super(DOES_NOT_ACCEPT_NULL, IMMUTABLE); + } - @Override - public void write(Kryo kryo, Output output, ImmutableList<Object> object) { - output.writeInt(object.size(), true); - for (Object elm : object) { - kryo.writeClassAndObject(output, elm); - } + @Override + public void write(Kryo kryo, Output output, ImmutableList<Object> object) { + output.writeInt(object.size(), true); + for (Object elm : object) { + kryo.writeClassAndObject(output, elm); } + } - @Override - public ImmutableList<Object> read(Kryo kryo, Input input, Class<ImmutableList<Object>> type) { - final int size = input.readInt(true); - final Object[] list = new Object[size]; - for (int i = 0; i < size; ++i) { - list[i] = kryo.readClassAndObject(input); - } - return ImmutableList.copyOf(list); + @Override + public ImmutableList<Object> read(Kryo kryo, Input input, Class<ImmutableList<Object>> type) { + final int size = input.readInt(true); + final Object[] list = new Object[size]; + for (int i = 0; i < size; ++i) { + list[i] = kryo.readClassAndObject(input); } + return ImmutableList.copyOf(list); + } - /** - * Creates a new {@link ImmutableListSerializer} and registers its serializer - * for the several ImmutableList related classes. - */ - public static void registerSerializers(Config config) { + /** + * Creates a new {@link ImmutableListSerializer} and registers its serializer + * for the several ImmutableList related classes. + */ + public static void registerSerializers(Config config) { - // ImmutableList (abstract class) - // +- RegularImmutableList - // | RegularImmutableList - // +- SingletonImmutableList - // | Optimized for List with only 1 element. - // +- SubList - // | Representation for part of ImmutableList - // +- ReverseImmutableList - // | For iterating in reverse order - // +- StringAsImmutableList - // | Used by Lists#charactersOf - // +- Values (ImmutableTable values) - // Used by return value of #values() when there are multiple cells + // ImmutableList (abstract class) + // +- RegularImmutableList + // | RegularImmutableList + // +- SingletonImmutableList + // | Optimized for List with only 1 element. + // +- SubList + // | Representation for part of ImmutableList + // +- ReverseImmutableList + // | For iterating in reverse order + // +- StringAsImmutableList + // | Used by Lists#charactersOf + // +- Values (ImmutableTable values) + // Used by return value of #values() when there are multiple cells - config.registerSerialization(ImmutableList.class, SdkRepackImmuListSerializer.class); + config.registerSerialization(ImmutableList.class, SdkRepackImmuListSerializer.class); - // Note: - // Only registering above is good enough for serializing/deserializing. - // but if using Kryo#copy, following is required. + // Note: + // Only registering above is good enough for serializing/deserializing. + // but if using Kryo#copy, following is required. - config.registerSerialization(ImmutableList.of().getClass(), SdkRepackImmuListSerializer.class); - config.registerSerialization(ImmutableList.of(1).getClass(), SdkRepackImmuListSerializer.class); - config.registerSerialization(ImmutableList.of(1,2,3).subList(1, 2).getClass(), SdkRepackImmuListSerializer.class); - config.registerSerialization(ImmutableList.of().reverse().getClass(), SdkRepackImmuListSerializer.class); + config.registerSerialization(ImmutableList.of().getClass(), SdkRepackImmuListSerializer.class); + config.registerSerialization(ImmutableList.of(1).getClass(), SdkRepackImmuListSerializer.class); + config.registerSerialization( + ImmutableList.of(1, 2, 3).subList(1, 2).getClass(), + SdkRepackImmuListSerializer.class); + config.registerSerialization( + ImmutableList.of().reverse().getClass(), + SdkRepackImmuListSerializer.class); - config.registerSerialization(Lists.charactersOf("KryoRocks").getClass(), SdkRepackImmuListSerializer.class); + config.registerSerialization( + Lists.charactersOf("KryoRocks").getClass(), + SdkRepackImmuListSerializer.class); - Table<Integer,Integer,Integer> baseTable = HashBasedTable.create(); - baseTable.put(1, 2, 3); - baseTable.put(4, 5, 6); - Table<Integer, Integer, Integer> table = ImmutableTable.copyOf(baseTable); - config.registerSerialization(table.values().getClass(), SdkRepackImmuListSerializer.class); + Table<Integer, Integer, Integer> baseTable = HashBasedTable.create(); + baseTable.put(1, 2, 3); + baseTable.put(4, 5, 6); + Table<Integer, Integer, Integer> table = ImmutableTable.copyOf(baseTable); + config.registerSerialization(table.values().getClass(), SdkRepackImmuListSerializer.class); - } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java index a514645..6973c82 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java @@ -5,67 +5,71 @@ import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; -import org.apache.beam.sdk.repackaged.com.google.common.collect.*; +import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableSet; +import org.apache.beam.sdk.repackaged.com.google.common.collect.Sets; public class SdkRepackImmuSetSerializer extends Serializer<ImmutableSet<Object>> { - private static final boolean DOES_NOT_ACCEPT_NULL = false; - private static final boolean IMMUTABLE = true; + private static final boolean DOES_NOT_ACCEPT_NULL = false; + private static final boolean IMMUTABLE = true; - public SdkRepackImmuSetSerializer() { - super(DOES_NOT_ACCEPT_NULL, IMMUTABLE); - } + public SdkRepackImmuSetSerializer() { + super(DOES_NOT_ACCEPT_NULL, IMMUTABLE); + } - @Override - public void write(Kryo kryo, Output output, ImmutableSet<Object> object) { - output.writeInt(object.size(), true); - for (Object elm : object) { - kryo.writeClassAndObject(output, elm); - } + @Override + public void write(Kryo kryo, Output output, ImmutableSet<Object> object) { + output.writeInt(object.size(), true); + for (Object elm : object) { + kryo.writeClassAndObject(output, elm); } + } - @Override - public ImmutableSet<Object> read(Kryo kryo, Input input, Class<ImmutableSet<Object>> type) { - final int size = input.readInt(true); - ImmutableSet.Builder<Object> builder = ImmutableSet.builder(); - for (int i = 0; i < size; ++i) { - builder.add(kryo.readClassAndObject(input)); - } - return builder.build(); + @Override + public ImmutableSet<Object> read(Kryo kryo, Input input, Class<ImmutableSet<Object>> type) { + final int size = input.readInt(true); + ImmutableSet.Builder<Object> builder = ImmutableSet.builder(); + for (int i = 0; i < size; ++i) { + builder.add(kryo.readClassAndObject(input)); } + return builder.build(); + } - /** - * Creates a new {@link ImmutableSetSerializer} and registers its serializer - * for the several ImmutableSet related classes. - */ - public static void registerSerializers(Config config) { + /** + * Creates a new {@link ImmutableSetSerializer} and registers its serializer + * for the several ImmutableSet related classes. + */ + public static void registerSerializers(Config config) { - // ImmutableList (abstract class) - // +- EmptyImmutableSet - // | EmptyImmutableSet - // +- SingletonImmutableSet - // | Optimized for Set with only 1 element. - // +- RegularImmutableSet - // | RegularImmutableList - // +- EnumImmutableSet - // | EnumImmutableSet + // ImmutableList (abstract class) + // +- EmptyImmutableSet + // | EmptyImmutableSet + // +- SingletonImmutableSet + // | Optimized for Set with only 1 element. + // +- RegularImmutableSet + // | RegularImmutableList + // +- EnumImmutableSet + // | EnumImmutableSet - config.registerSerialization(ImmutableSet.class, SdkRepackImmuSetSerializer.class); + config.registerSerialization(ImmutableSet.class, SdkRepackImmuSetSerializer.class); - // Note: - // Only registering above is good enough for serializing/deserializing. - // but if using Kryo#copy, following is required. + // Note: + // Only registering above is good enough for serializing/deserializing. + // but if using Kryo#copy, following is required. - config.registerSerialization(ImmutableSet.of().getClass(), SdkRepackImmuSetSerializer.class); - config.registerSerialization(ImmutableSet.of(1).getClass(), SdkRepackImmuSetSerializer.class); - config.registerSerialization(ImmutableSet.of(1,2,3).getClass(), SdkRepackImmuSetSerializer.class); + config.registerSerialization(ImmutableSet.of().getClass(), SdkRepackImmuSetSerializer.class); + config.registerSerialization(ImmutableSet.of(1).getClass(), SdkRepackImmuSetSerializer.class); + config.registerSerialization( + ImmutableSet.of(1, 2, 3).getClass(), + SdkRepackImmuSetSerializer.class); - config.registerSerialization( - Sets.immutableEnumSet(SomeEnum.A, SomeEnum.B, SomeEnum.C).getClass(), SdkRepackImmuSetSerializer.class); - } + config.registerSerialization( + Sets.immutableEnumSet(SomeEnum.A, SomeEnum.B, SomeEnum.C).getClass(), + SdkRepackImmuSetSerializer.class); + } - private enum SomeEnum { - A, B, C - } + private enum SomeEnum { + A, B, C + } } http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java index c8b0138..bcee778 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java @@ -5,155 +5,177 @@ import com.alibaba.jstorm.esotericsoftware.kryo.Kryo; import com.alibaba.jstorm.esotericsoftware.kryo.Serializer; import com.alibaba.jstorm.esotericsoftware.kryo.io.Input; import com.alibaba.jstorm.esotericsoftware.kryo.io.Output; - import java.lang.reflect.Field; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; public class UnmodifiableCollectionsSerializer extends Serializer<Object> { - private static final Field SOURCE_COLLECTION_FIELD; - private static final Field SOURCE_MAP_FIELD; + private static final Field SOURCE_COLLECTION_FIELD; + private static final Field SOURCE_MAP_FIELD; - static { - try { - SOURCE_COLLECTION_FIELD = Class.forName("java.util.Collections$UnmodifiableCollection" ) - .getDeclaredField( "c" ); - SOURCE_COLLECTION_FIELD.setAccessible( true ); + static { + try { + SOURCE_COLLECTION_FIELD = Class.forName("java.util.Collections$UnmodifiableCollection") + .getDeclaredField("c"); + SOURCE_COLLECTION_FIELD.setAccessible(true); - SOURCE_MAP_FIELD = Class.forName("java.util.Collections$UnmodifiableMap" ) - .getDeclaredField( "m" ); - SOURCE_MAP_FIELD.setAccessible( true ); - } catch ( final Exception e ) { - throw new RuntimeException( "Could not access source collection" + - " field in java.util.Collections$UnmodifiableCollection.", e ); - } + SOURCE_MAP_FIELD = Class.forName("java.util.Collections$UnmodifiableMap") + .getDeclaredField("m"); + SOURCE_MAP_FIELD.setAccessible(true); + } catch (final Exception e) { + throw new RuntimeException("Could not access source collection" + + " field in java.util.Collections$UnmodifiableCollection.", e); } - - @Override - public Object read(final Kryo kryo, final Input input, final Class<Object> clazz) { - final int ordinal = input.readInt( true ); - final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.values()[ordinal]; - final Object sourceCollection = kryo.readClassAndObject( input ); - return unmodifiableCollection.create( sourceCollection ); + } + + @Override + public Object read(final Kryo kryo, final Input input, final Class<Object> clazz) { + final int ordinal = input.readInt(true); + final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.values()[ordinal]; + final Object sourceCollection = kryo.readClassAndObject(input); + return unmodifiableCollection.create(sourceCollection); + } + + @Override + public void write(final Kryo kryo, final Output output, final Object object) { + try { + final UnmodifiableCollection unmodifiableCollection = + UnmodifiableCollection.valueOfType(object.getClass()); + // the ordinal could be replaced by s.th. else (e.g. a explicitely managed "id") + output.writeInt(unmodifiableCollection.ordinal(), true); + kryo.writeClassAndObject(output, unmodifiableCollection.sourceCollectionField.get(object)); + } catch (final RuntimeException e) { + // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write... + // handles SerializationException specifically (resizing the buffer)... + throw e; + } catch (final Exception e) { + throw new RuntimeException(e); } - - @Override - public void write(final Kryo kryo, final Output output, final Object object) { - try { - final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.valueOfType( object.getClass() ); - // the ordinal could be replaced by s.th. else (e.g. a explicitely managed "id") - output.writeInt( unmodifiableCollection.ordinal(), true ); - kryo.writeClassAndObject( output, unmodifiableCollection.sourceCollectionField.get( object ) ); - } catch ( final RuntimeException e ) { - // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write... - // handles SerializationException specifically (resizing the buffer)... - throw e; - } catch ( final Exception e ) { - throw new RuntimeException( e ); - } + } + + @Override + public Object copy(Kryo kryo, Object original) { + try { + final UnmodifiableCollection unmodifiableCollection = + UnmodifiableCollection.valueOfType(original.getClass()); + Object sourceCollectionCopy = + kryo.copy(unmodifiableCollection.sourceCollectionField.get(original)); + return unmodifiableCollection.create(sourceCollectionCopy); + } catch (final RuntimeException e) { + // Don't eat and wrap RuntimeExceptions + throw e; + } catch (final Exception e) { + throw new RuntimeException(e); } - - @Override - public Object copy(Kryo kryo, Object original) { - try { - final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.valueOfType( original.getClass() ); - Object sourceCollectionCopy = kryo.copy(unmodifiableCollection.sourceCollectionField.get(original)); - return unmodifiableCollection.create( sourceCollectionCopy ); - } catch ( final RuntimeException e ) { - // Don't eat and wrap RuntimeExceptions - throw e; - } catch ( final Exception e ) { - throw new RuntimeException( e ); - } + } + + private static enum UnmodifiableCollection { + COLLECTION( + Collections.unmodifiableCollection(Arrays.asList("")).getClass(), + SOURCE_COLLECTION_FIELD) { + @Override + public Object create(final Object sourceCollection) { + return Collections.unmodifiableCollection((Collection<?>) sourceCollection); + } + }, + RANDOM_ACCESS_LIST( + Collections.unmodifiableList(new ArrayList<Void>()).getClass(), + SOURCE_COLLECTION_FIELD) { + @Override + public Object create(final Object sourceCollection) { + return Collections.unmodifiableList((List<?>) sourceCollection); + } + }, + LIST(Collections.unmodifiableList(new LinkedList<Void>()).getClass(), SOURCE_COLLECTION_FIELD) { + @Override + public Object create(final Object sourceCollection) { + return Collections.unmodifiableList((List<?>) sourceCollection); + } + }, + SET(Collections.unmodifiableSet(new HashSet<Void>()).getClass(), SOURCE_COLLECTION_FIELD) { + @Override + public Object create(final Object sourceCollection) { + return Collections.unmodifiableSet((Set<?>) sourceCollection); + } + }, + SORTED_SET( + Collections.unmodifiableSortedSet(new TreeSet<Void>()).getClass(), + SOURCE_COLLECTION_FIELD) { + @Override + public Object create(final Object sourceCollection) { + return Collections.unmodifiableSortedSet((SortedSet<?>) sourceCollection); + } + }, + MAP(Collections.unmodifiableMap(new HashMap<Void, Void>()).getClass(), SOURCE_MAP_FIELD) { + @Override + public Object create(final Object sourceCollection) { + return Collections.unmodifiableMap((Map<?, ?>) sourceCollection); + } + + }, + SORTED_MAP( + Collections.unmodifiableSortedMap(new TreeMap<Void, Void>()).getClass(), + SOURCE_MAP_FIELD) { + @Override + public Object create(final Object sourceCollection) { + return Collections.unmodifiableSortedMap((SortedMap<?, ?>) sourceCollection); + } + }; + + private final Class<?> type; + private final Field sourceCollectionField; + + private UnmodifiableCollection(final Class<?> type, final Field sourceCollectionField) { + this.type = type; + this.sourceCollectionField = sourceCollectionField; } - private static enum UnmodifiableCollection { - COLLECTION( Collections.unmodifiableCollection( Arrays.asList( "" ) ).getClass(), SOURCE_COLLECTION_FIELD ){ - @Override - public Object create( final Object sourceCollection ) { - return Collections.unmodifiableCollection( (Collection<?>) sourceCollection ); - } - }, - RANDOM_ACCESS_LIST( Collections.unmodifiableList( new ArrayList<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){ - @Override - public Object create( final Object sourceCollection ) { - return Collections.unmodifiableList( (List<?>) sourceCollection ); - } - }, - LIST( Collections.unmodifiableList( new LinkedList<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){ - @Override - public Object create( final Object sourceCollection ) { - return Collections.unmodifiableList( (List<?>) sourceCollection ); - } - }, - SET( Collections.unmodifiableSet( new HashSet<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){ - @Override - public Object create( final Object sourceCollection ) { - return Collections.unmodifiableSet( (Set<?>) sourceCollection ); - } - }, - SORTED_SET( Collections.unmodifiableSortedSet( new TreeSet<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){ - @Override - public Object create( final Object sourceCollection ) { - return Collections.unmodifiableSortedSet( (SortedSet<?>) sourceCollection ); - } - }, - MAP( Collections.unmodifiableMap( new HashMap<Void, Void>() ).getClass(), SOURCE_MAP_FIELD ) { - - @Override - public Object create( final Object sourceCollection ) { - return Collections.unmodifiableMap( (Map<?, ?>) sourceCollection ); - } - - }, - SORTED_MAP( Collections.unmodifiableSortedMap( new TreeMap<Void, Void>() ).getClass(), SOURCE_MAP_FIELD ) { - @Override - public Object create( final Object sourceCollection ) { - return Collections.unmodifiableSortedMap( (SortedMap<?, ?>) sourceCollection ); - } - }; - - private final Class<?> type; - private final Field sourceCollectionField; - - private UnmodifiableCollection( final Class<?> type, final Field sourceCollectionField ) { - this.type = type; - this.sourceCollectionField = sourceCollectionField; - } + /** + * @param sourceCollection + */ + public abstract Object create(Object sourceCollection); - /** - * @param sourceCollection - */ - public abstract Object create( Object sourceCollection ); - - static UnmodifiableCollection valueOfType(final Class<?> type ) { - for( final UnmodifiableCollection item : values() ) { - if ( item.type.equals( type ) ) { - return item; - } - } - throw new IllegalArgumentException( "The type " + type + " is not supported." ); + static UnmodifiableCollection valueOfType(final Class<?> type) { + for (final UnmodifiableCollection item : values()) { + if (item.type.equals(type)) { + return item; } - + } + throw new IllegalArgumentException("The type " + type + " is not supported."); } - /** - * Creates a new {@link UnmodifiableCollectionsSerializer} and registers its serializer - * for the several unmodifiable Collections that can be created via {@link Collections}, - * including {@link Map}s. - * - * @see Collections#unmodifiableCollection(Collection) - * @see Collections#unmodifiableList(List) - * @see Collections#unmodifiableSet(Set) - * @see Collections#unmodifiableSortedSet(SortedSet) - * @see Collections#unmodifiableMap(Map) - * @see Collections#unmodifiableSortedMap(SortedMap) - */ - public static void registerSerializers( Config config ) { - UnmodifiableCollection.values(); - for ( final UnmodifiableCollection item : UnmodifiableCollection.values() ) { - config.registerSerialization( item.type, UnmodifiableCollectionsSerializer.class ); - } + } + + /** + * Creates a new {@link UnmodifiableCollectionsSerializer} and registers its serializer + * for the several unmodifiable Collections that can be created via {@link Collections}, + * including {@link Map}s. + * + * @see Collections#unmodifiableCollection(Collection) + * @see Collections#unmodifiableList(List) + * @see Collections#unmodifiableSet(Set) + * @see Collections#unmodifiableSortedSet(SortedSet) + * @see Collections#unmodifiableMap(Map) + * @see Collections#unmodifiableSortedMap(SortedMap) + */ + public static void registerSerializers(Config config) { + UnmodifiableCollection.values(); + for (final UnmodifiableCollection item : UnmodifiableCollection.values()) { + config.registerSerialization(item.type, UnmodifiableCollectionsSerializer.class); } + } }
