The Cloud Collector is the first layer of cloud warehouse which is responsible for data gathering from cloud services. It is implemented as a plugin for Nsys Daemon. As mentioned here there are two main subsystems Management Agent and Data Processor. First of them is used for data collecting which is realized by Nsys Workload Scheduler component and the second one is used as an operational data store (ODS) for additional data processing as part of the integration layer of cloud warehouse.
Nsys Cloud Collector Architecture
The plugin implements new module collector for daemon. In plugin descriptor nsys-plugin.xml it looks as follows:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
<collector key="cloud-warehouse-collector-dropbox" name="Cloud Warehouse Collector for Dropbox"> <queries> <nql>SELECT Name, Size FROM Files</nql> <nql>SELECT Name, Size FROM Folders</nql> </queries> <data-processor-stages> <stage key="dropbox-stage-getdata" name="Get Dropbox data from a job" class="org.nsys.cloud.warehouse.collector.dropbox.dataprocessor.stage.GetDropboxDataStage" /> <stage key="dropbox-stage-analyze" name="Analyze Dropbox data" class="org.nsys.cloud.warehouse.collector.dropbox.dataprocessor.stage.AnalyzeDropboxDataStage" /> <stage key="dropbox-stage-storage" name="Store Dropbox data" class="org.nsys.cloud.warehouse.collector.dropbox.dataprocessor.stage.StoreDropboxDataStage" /> </data-processor-stages> </collector> |
The element queries contains list of NQL queries which are used to get data from a cloud service. Each cloud service needs to have implemented a provider for NQL (Nsys Query Language) which transforms a NQL query for a particular entity such as Host or Datacenter to data interpretation in a cloud service which can be for instance a database or file system trough a web service.
The element data-processor-stages contains list of stages for Data Processor subsystem in daemon. A stage represents a phase during data processing in Data Processor component. Between individual stages is send a NeuralBag which contains data processed during individual stages. Below you can see a sample of stage.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
package org.nsys.cloud.plugin.daemon; import org.nsys.logging.Log; import org.nsys.core.NeuralBag; import org.nsys.daemon.core.DataProcessorStage; import org.nsys.daemon.core.DataProcessingThread; public class CollectorGetDataStage implements DataProcessorStage { private static final Log log = new Log(CollectorGetDataStage.class); public boolean process(NeuralBag bag, DataProcessingThread thread) { StringBuilder content = new StringBuilder(); ... bag.add("data", content.toString()); return (true); } } |
Each stage has to implement the interface org.nsys.daemon.core.DataProcessorStage. All stages from a plugin are processed in a pipeline component of the Data Processor subsystem. The collector uses for stages processing the multithreading mechanism.
The Data Processor represents in cloud warehouse an operational data store which is used for additional operations. The first stage CollectorGetDataStage gets data from a job running in workload scheduler. The job runs periodically and every time it runs there is executed a NQL query which returns data from a cloud service. In pipeline are followed another stages which are doing some analytical tasks and the last stage in pipeline is CollectorStorageStage which store collected data to a database and later the data can be used in reports.
Cloud Collector Plugin Configuration
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
<?xml version="1.0" encoding="UTF-8"?> <nsys-plugin key="org.nsys.cloud.collector" name="Nsys Cloud Collector" plugin-version="1"> <!—- Plugin Info --> <plugin-info> <description>Nsys Cloud Collector</description> <version>1.0.0.0</version> <vendor name="Nsys" url="http://nsys.org" /> </plugin-info> <!—- Plugin modules --> <management-agent key="collector" name="Nsys Collector" class="org.nsys.cloud.Collector"> <dependency>org.nsys.daemon:scheduler</dependency> </management-agent> ... <collector key="cloud-warehouse-collector-dropbox" name="Cloud Warehouse Collector for Dropbox"> <queries> <nql>SELECT Name, Size FROM Files</nql> <nql>SELECT Name, Size FROM Folders</nql> </queries> <data-processor-stages> <stage key="dropbox-stage-getdata" name="Get Dropbox data from a job" class="org.nsys.cloud.warehouse.collector.dropbox.dataprocessor.stage.GetDropboxDataStage" /> <stage key="dropbox-stage-analyze" name="Analyze Dropbox data" class="org.nsys.cloud.warehouse.collector.dropbox.dataprocessor.stage.AnalyzeDropboxDataStage" /> <stage key="dropbox-stage-storage" name="Store Dropbox data" class="org.nsys.cloud.warehouse.collector.dropbox.dataprocessor.stage.StoreDropboxDataStage" /> </data-processor-stages> </collector> ... </nsys-plugin> |
Below you can find sample of collector module implementation.
Collector Module Implementation
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package org.nsys.cloud.plugin.daemon; import java.util.List; import java.util.ArrayList; import org.nsys.daemon.core.DataProcessorStage; import org.nsys.daemon.plugin.Module; public class CollectorModule extends Module<CollectorPlugin> { public static final String TYPE = "collector"; private List<String> queries = new ArrayList<String>(); private List<Module<DataProcessorStage>> moduleStages = new ArrayList<Module<DataProcessorStage>>(); public CollectorModule() { setType(TYPE); } public List<String> getQueries() { return queries; } public void setQueries(List<String> queries) { this.queries = queries; } public List<DataProcessorStage> getStages() { List<DataProcessorStage> stages = new ArrayList<DataProcessorStage>(); for (Module<DataProcessorStage> stage : moduleStages) { stages.add(stage.getObject()); } return stages; } public List<Module<DataProcessorStage>> getModuleStages() { return moduleStages; } public void setModuleStages(List<Module<DataProcessorStage>> moduleStages) { this.moduleStages = moduleStages; } } |
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package org.nsys.cloud.plugin.daemon; import java.util.List; import java.util.ArrayList; import org.nsys.daemon.core.DataProcessorStage; import org.nsys.daemon.plugin.Module; import org.nsys.daemon.plugin.ModuleEntity; import org.nsys.daemon.plugin.ParseException; import org.nsys.daemon.plugin.PluginHandler; import org.nsys.daemon.plugin.AbstractModuleEntityHandler; public class CollectorModuleHandler extends AbstractModuleEntityHandler<CollectorModule> { public static final String MODULE_COLLECTOR = "collector"; public static final String QUERIES = "queries"; public static final String NQL = "nql"; public static final String DATA_PROCESSOR_STAGES = "data_processor_stages"; public static final String STAGE = "stage"; private List<Module<DataProcessorStage>> moduleStages; @Override public void startDocument() { log.debugFormat("Starting parsing plugin descriptor for [%s] module", MODULE_COLLECTOR); } @Override public void endDocument() { log.debugFormat("Ended parsing plugin descriptor for [%s] module", MODULE_COLLECTOR); } @Override public void handleEntity(ModuleEntity entity, PluginHandler pluginHandler) throws ParseException { String entityName = entity.getName().toLowerCase(); if (entityName.equals(MODULE_COLLECTOR)) { log.debugFormat("Found module [%s] with key [%s]", entityName, entity.getAttribute(Module.KEY)); try { CollectorModule module = new CollectorModule(); module.setKey(entity.getAttribute(Module.KEY)); module.setName(entity.getAttribute(Module.NAME)); moduleStages = new ArrayList<Module<DataProcessorStage>>(); for (ModuleEntity e : entity.getNestedEntities()) { String eName = e.getName().toLowerCase(); if (eName.equals(QUERIES)) { processQueries(e, module); } else if (eName.equals(DATA_PROCESSOR_STAGES)) { processStages(e, pluginHandler); } } module.setModuleStages(moduleStages); modules.add(module); } catch (Exception e) { String msg = String.format("Unable to load module [%s] with key [%s]!", entityName, enti-ty.getAttribute(Module.KEY)); log.error(msg, e); } } } private void processStages(ModuleEntity entity, PluginHandler pluginHandler) { for (ModuleEntity e : entity.getNestedEntities()) { String entityName = e.getName().toLowerCase(); if (entityName.equals(STAGE)) { try { Module<DataProcessorStage> stage = new Module<DataProcessorStage>(); stage.setType(STAGE); stage.setKey(e.getAttribute(Module.KEY)); stage.setName(e.getAttribute(Module.NAME)); stage.setClassName(e.getAttribute(Module.CLASS_NAME)); moduleStages.add(stage); } catch (Exception ex) { String msg = String.format("Unable to load module [%s] with key [%s]!", entityName, entity.getAttribute(Module.KEY)); log.error(msg, ex); } } } } private void processQueries(ModuleEntity entity, CollectorModule module) { for (ModuleEntity e : entity.getNestedEntities()) { String entityName = e.getName().toLowerCase(); if (entityName.equals(NQL)) { module.getQueries().add(e.getText()); } } } } |