Building Agents
Overview
To get started with developing a new Agent, create a new C# library project in Visual Studio and import the XMPro.IoT.Framework NuGet package. When writing the code for an Agent, you will have to implement a number of interfaces. Which interfaces to implement depends on the category under which your Agent will fall:
Listeners
Listeners are created by implementing IAgent and IPollingAgent interfaces. To push the events to the next receiver, the OnPublish event should be invoked and the events should be passed as arguments.
Action Agents/ Functions
Action Agents are created by implementing the IAgent and IReceivingAgent interfaces. The Receive method will be called every time events are received by this Agent. To publish these events again, the same logic as per the Listener Agent can be used.
Context Providers
Context Providers are created by implementing the IAgent, IPollingAgent interfaces. They are very similar to Listeners; however, Context Providers publish all available records/events when polled instead of only publishing the newer/changed ones.
Transformations
Transformations are implemented in a similar way as Action Agents, except that all Transformations should have the Require Input Map flag set to false and must not implement the GetInputAttributes method, hence it should be:
The interfaces that can be implemented are as follows:
The matrix below shows which interface needs to be implemented for which category Agent:
Listener
Required
Recommended
Optional
Optional
Context Provider
Required
Recommended
Optional
Optional
Transformation
Required
Optional
Required
Optional
Action Agent/ Function
Required
Optional
Required
Optional
AI & Machine Learning
Required
Optional
Required
Optional
The IPollingAgent interface is not strictly required for Listeners or Context Providers, however, it is generally used in most cases. Not implementing IPollingAgent for a Listener or Context Provider should be considered an advanced option.
IAgent
IAgent is the primary interface that must be implemented by all Agents as it provides the structure for the workings of the Agent. After implementing this interface, there are several methods you have to add to your project that forms part of this predefined structure.
Settings/Configurations
Some Agents need to be provided with configurations by the user, for example, for a CSV listener Agent to get records from a CSV file, it needs the following:
Polling interval (in seconds)
CSV file
CSV Definition
Each of these settings should be referenced in the code and must correspond to the settings template created when packaging your Agent.
A template is a JSON representation of all the controls and their layout that will be used to capture the settings from a user.
An example of the settings template (generated using the XMPro Package Manager) is shown in the image below. The settings in this example consist of the following controls:
Group (Data)
File Upload
Group (Payload)
Grid
Each control has a Key, which uniquely identifies it in the template and allows the Agent code to access its value at any time. To get the value contained in a setting, use the following code:
Before a template is rendered on the screen, or if a postback occurs on any control in the template, the method below would be called to allow the Agent an opportunity to make any necessary runtime changes to the template, for example, verifying user credentials, displaying all tables of a selected database in a drop-down list, etc. In this example, no changes are being made to the template but, if needed, they can be added to the todo section.
For a postback to occur after a user navigates out of a setting field, the Postback property needs to be set to true when packaging the Agent.
Validate
If a user tries to run an Integrity Check on a Data Stream in Data Stream Designer, all Agents will be requested to validate the configurations they have been provided. An Agent has to use this opportunity to inform the user about any configurations that are incorrect, for example, credentials that have expired, required values that are missing, etc.
To validate the configurations/ settings in an Agent, the Validate method needs to be implemented. This method returns an array of errors that occurred. If validation was successful, an empty array would be returned.
The example code below verifies if a user has specified a broker address, topic, and payload definition for an MQTT Agent:
Output Payload
Each Agent has the responsibility to inform the Engine about the structure of the payload that will be produced by the Agent. To do this, implement the following method:
This method returns a collection that has an Attribute type, which is a type that represents the name and type of a given attribute in the outgoing payload. As from XMPro.IOT.Framework version 3.0.2, comparison/ equality operations are also supported in Attribute, for example:
Create
Each Agent needs to implement a method called Create, which will be invoked when your Agent is being hosted. User-defined configuration is passed as a parameter to this method and should be stored in a class variable as far as possible for later use. This is a good point to provide any resources needed for the working of your Agent.
Start
The Start method needs to be implemented by all Agents. This method will be invoked when your Agent is hosted and starts to work.
Destroy
Each Agent needs to implement a Destroy method, which will be invoked if the Create method was called successfully, when a data stream is either being unpublished or it encounters an error and fails to start.
Use this method to release any resources or memory that your Agent may have acquired during its creation and lifetime.
Publishing Events
To push the events to the next Agent, your Agent should invoke the OnPublish event with the events passed as arguments:
Events are represented as JSON Objects and have to be pushed as a collection, i.e. JArray.
Please note that OnPublishArgs(Array rtr) is obsolete from XMPro.IOT.Framework 3.0.2 onwards. You are now required to specify the endpoint name on which you would like to publish (i.e. OnPublishArgs(Array rtr, string Endpoint))
Decrypting Values
If an Agent’s configuration contains a Secure/Password Textbox, its value will automatically be encrypted. To decrypt the value, use the following set of instructions:
Custom Events
While building your Agent, you may need to use external libraries or third-party event subscriptions to handle custom events. If these are used, you must catch any exceptions from the event handlers yourself, to prevent uncaught exceptions that could possibly crash the Data Stream if they get through.
IPolling Agent
The IPollingAgent interface allows time-based operations. Implementing this interface, and opting in to Polling by returning true from the RequiresPolling method, will automatically add a PollingInterval setting to the configuration template of your Agent, which can be used by the user to specify the interval for polling. The Poll method will be invoked every time the poll interval elapses.
This method will be called at regular intervals according to the Configuration settings, and can be used to perform any work or logic you wish, for example, querying a third-party system for changes.
The RequiresPolling method is an advanced option. It is expected that in most cases, this method should simply return a true value, which will not change the behaviour of the Agent. The PollingInterval setting will display as normal, and the Poll method will be called at that interval, as normal.
Advanced users, however, can use this method to decide to opt-out of Polling settings, by returning false. The parameters method parameter will contain the Stream Object's Configuration, allowing you to determine whether to return true to opt-in, or false to opt out, depending on what settings the user has selected. Opting out will cause the PollingInterval setting to not appear in the configuration tab, and the Poll method to never be called when the Stream is published.
This may be useful when the agent you are building can be configured to either actively query its configured third-party system for data at regular intervals, or set up a persistent connection to the third-party service and passively wait for that connection to deliver data.
If the Agent does not need to query for data at regular intervals, or perform other work or logic on a specific schedule, it is recommended to not implement IPollingAgent rather than always returning false from the RequiresPolling method.
IReceivingAgent
If your Agent is required to receive inputs from other Agents, you should implement the IReceivingAgent interface.
Input Payload
Each Agent is responsible to inform the Engine about the structure of the payload it consumes. To achieve this in your Agent, implement the following method:
This method returns a collection consisting of Attribute, which is a type that represents the name and type of a given attribute in the incoming payload.
Input Mapping
In most cases, if an incoming payload structure is supposed to be different from what the parent is sending, i.e. the Input Payload has been specified above, the user will have to map parent outputs to the current Agent’s inputs. To enable this, mark the Require Input Map flag as true in the Stream Integration Manager when packaging the Agent.
Endpoint
Each Agent can have a number of input and output endpoints. Endpoints are the points where incoming or outgoings arrows are connected. Each endpoint consists of a Name<String> attribute. You will be passed an endpoint name when queried for an Input payload definition. Be sure to specify the endpoint name when querying the parent’s output payload definition.
Parent Outputs
All receiving Agents can query the structure of parent Agent outputs connected at a given endpoint by invoking an event, as demonstrated in the example below:
Receiving Events
Events published to a receiving Agent can be received by implementing the following method:
The endpointName parameter will identify which endpoint the events have been received at.
It is not guaranteed that the Start method will be invoked before the Receive method. Use the Create method to execute any logic that needs to be executed before the Receive method is called.
IPublishError
An Agent can publish messages to an error endpoint by implementing the IPublishesError interface. An unhandled error in an Agent will be captured and error information will be published to the error endpoint.
Implement the interface member:
To push the error to the next Agent, the OnPublishError event should be invoked, and the error information should be passed as arguments:
Error endpoints should be enabled in XMPro Stream Integration Manager when packaging the Agent. This can be done by selecting the “Add On Error Endpoint?” checkbox. See the image on the right for an example.
Example
The code below is an example of a basic MQTT Listener Agent. Take note of how the interfaces and methods have been implemented.
Please note that this example uses the M2MqttDotnetCore 1.0.7 NuGet package.
Further Reading
Last updated