METRON-965: In the case where we specify the syncpolicy in the HDFS Writer, we do not properly clone and end up syncing for every record closes apache/incubator-metron#596
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/64473d4e Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/64473d4e Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/64473d4e Branch: refs/heads/Metron_0.4.0 Commit: 64473d4e8399e71d0ac1a81b6e78e72b982e42f8 Parents: fee758b Author: cstella <ceste...@gmail.com> Authored: Sat May 20 09:57:13 2017 -0400 Committer: cstella <ceste...@gmail.com> Committed: Sat May 20 09:57:13 2017 -0400 ---------------------------------------------------------------------- .../writer/hdfs/ClonedSyncPolicyCreator.java | 47 ++++++++++++++++++++ .../apache/metron/writer/hdfs/HdfsWriter.java | 6 +-- .../hdfs/ClonedSyncPolicyCreatorTest.java | 42 +++++++++++++++++ 3 files changed, 92 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/64473d4e/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreator.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreator.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreator.java new file mode 100644 index 0000000..4d32fc9 --- /dev/null +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreator.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.writer.hdfs; + +import org.apache.metron.common.configuration.writer.WriterConfiguration; +import org.apache.metron.common.utils.SerDeUtils; +import org.apache.storm.hdfs.bolt.sync.SyncPolicy; + +public class ClonedSyncPolicyCreator implements SyncPolicyCreator { + SyncPolicy syncPolicy; + public ClonedSyncPolicyCreator(SyncPolicy policy) { + syncPolicy = policy; + } + + @Override + public SyncPolicy create(String sensor, WriterConfiguration config) { + try { + //we do a deep clone of the SyncPolicy via kryo serialization. This gives us a fresh policy + //to work with. The reason we need a fresh policy is that we want to ensure each handler + //(one handler per task & sensor type and one handler per file) has its own sync policy. + // Reusing a sync policy is a bad idea, so we need to clone it here. Unfortunately the + // SyncPolicy object does not implement Cloneable, so we'll need to clone it via serialization + //to get a fresh policy object. Note: this would be expensive if it was in the critical path, + // but should be called infrequently (once per sync). + byte[] serializedForm = SerDeUtils.toBytes(syncPolicy); + return SerDeUtils.fromBytes(serializedForm, SyncPolicy.class); + } + catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/64473d4e/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java index e0ab502..c5d1e4f 100644 --- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java +++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java @@ -24,6 +24,7 @@ import org.apache.metron.common.dsl.StellarFunctions; import org.apache.metron.common.dsl.VariableResolver; import org.apache.metron.common.stellar.StellarCompiler; import org.apache.metron.common.stellar.StellarProcessor; +import org.apache.metron.common.utils.SerDeUtils; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.apache.metron.common.configuration.writer.WriterConfiguration; @@ -37,8 +38,7 @@ import org.apache.storm.hdfs.bolt.sync.SyncPolicy; import org.apache.storm.hdfs.common.rotation.RotationAction; import org.json.simple.JSONObject; -import java.io.IOException; -import java.io.Serializable; +import java.io.*; import java.util.*; import java.util.function.Function; @@ -85,7 +85,7 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable { this.fileNameFormat.prepare(stormConfig,topologyContext); if(syncPolicy != null) { //if the user has specified the sync policy, we don't want to override their wishes. - syncPolicyCreator = (source,config) -> syncPolicy; + syncPolicyCreator = new ClonedSyncPolicyCreator(syncPolicy); } else { //if the user has not, then we want to have the sync policy depend on the batch size. http://git-wip-us.apache.org/repos/asf/metron/blob/64473d4e/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreatorTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreatorTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreatorTest.java new file mode 100644 index 0000000..092bf0f --- /dev/null +++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreatorTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.writer.hdfs; + +import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; +import org.apache.storm.hdfs.bolt.sync.SyncPolicy; +import org.junit.Assert; +import org.junit.Test; + +public class ClonedSyncPolicyCreatorTest { + + @Test + public void testClonedPolicy() { + CountSyncPolicy basePolicy = new CountSyncPolicy(5); + ClonedSyncPolicyCreator creator = new ClonedSyncPolicyCreator(basePolicy); + //ensure cloned policy continues to work and adheres to the contract: mark on 5th call. + SyncPolicy clonedPolicy = creator.create("blah", null); + for(int i = 0;i < 4;++i) { + Assert.assertFalse(clonedPolicy.mark(null, i)); + } + Assert.assertTrue(clonedPolicy.mark(null, 5)); + //reclone policy and ensure it adheres to the original contract. + clonedPolicy = creator.create("blah", null); + Assert.assertFalse(clonedPolicy.mark(null, 0)); + } +}