logo

Worker Framework

Cross Platform, Cloud Ready, Microservice Framework

Architecture

The Worker Framework is a set of Java modules that provide a starting point for developing Worker Microservies. It allows programmers to focus on on adding business value, instead of writing boilerplate code. It also provides an execution infrastructure code that dynamically loads and runs worker tasks. The framework promotes best practices through well defined set of interfaces and base classes that developers work against.

Overview

The Worker Framework is composed of the following main functional parts:

  • the worker core application
  • APIs for a worker back-end and infrastructure plug-ins:
    • external data storage
    • messaging queue
    • configuration
    • serialization
  • default implementations of infrastructure plug-ins

Worker Framework Dependencies

Worker Core Application

The worker-core application manages the flow of data between infrastructure components and a worker back-end. It uses service location mechanism to dynamically detects and load implementations of required components, including worker back-end.

Worker Flow

Service Location

The dependency loading mechanism, called servce location, relies on the util-moduleloader library which internally uses the Java ServiceLoader. To instantiate a service, the class has to have a parameterless constructor. This is why all of the plug-ins use plug-in providers that instantiate the actual plug-in. They also need to be advertised using the ServiceLoader mechanism. Implementations of required components have to be packaged with the worker-core application to allow service locator to detect and use them.

When running, application awaits for incoming messages. When a message is received, application will instantiate a Worker implementation and execute the doWork method.

To do this, it will use a WorkerFactoryProvider located during start-up procedure to obtain a WorkerFactory which is responsible for construction of a Worker implementation.

Worker Factory Provider

The WorkerFactoryProvider implementation is responsible for creating an instance of a WorkerFactory. It is resolved using the Java ServiceLoader which means it has to have a parameterless constructor. This interface has only a single method, getWorkerFactory, called during application start-up. It is supplied with the following parameters representing worker framework components which can be used in construction of a WorkerFactory:

  • configurationSource used to retrieve worker settings
  • datastore used for interaction with external data storage
  • codec used for serialization and de-serialization of messages

Worker Factory

The WorkerFactory creates a new instance of a Worker for each message received. It can supply to a worker any components that were passed to the factory from a WorkerFactoryProvider, including configuration, data store and codec.

WorkerFactory requires following methods to be implemented:

  • getWorker used to construct a new worker instance and supplied with a worker task data
  • getInvalidTaskQueue which should return a name of a queue to be used when task is not recognized
  • getWorkerThreads providing the maximum number of threads used by worker-core when executing a Worker logic

Additionally, developer can implement a shutdown method to perform required clean-up of resources when the worker application is closed.

There is a base implementation of the WorkerFactory interface, the AbstractWorkerFactory, providing support for de-serialization and validation of a task message as well as worker configuration. It should be used whenever possible.

Worker

Workers are stateless components designed to perform a specific task. A new worker instance will be created for every task.

Worker receives a task, performs work and outputs a result. worker-core application takes care of all interactions with a queueing mechanism.

The main method that developer is required to implement is doWork(). It is called by a worker-core application per task and return result is expected to be the worker output data. Other methods that require implementation are:

  • getWorkerIdentifier which should uniquely identify the worker type
  • getWorkerApiVersion which should identify the worker input and output API version (task and result)
  • getGeneralFailureResult which should return a general failure result in case of unhandled exception.

There is a base implementation of the Worker interface, the AbstractWorker. This class provides default implementation of all abstract methods except doWork. It also includes:

  • utility functions for creating responses
  • default exception handling.
  • property for worker task

It should be used as a starting point whenever possible.

Data Store

Workers often need to interact with an external data storage. Worker Framework supports it by providing the DataStore interface. A worker can use it to retrieve and store binary data identified by name. Implementations are pluggable and following are provided out-of-box:

  • FileSystem: worker-store-fs
  • AWS S3: worker-store-s3

To use a different technology, developers are required to implement the DataStore interface as well as DataStoreProvider supporting the service location.

Messaging Queue

Workers use message queues to receive work and respond with results. All interactions are handled internally by the framework. Out-of-box, the Worker Framework uses RabbitMQ but this is a pluggable mechanism. Developers wishing to use a different technology are required to implement the ManagedWorkerQueue interface. WorkerQueueProvider implementation is also required to support the service location mechanism.

Configuration

In most cases, workers will require some sort of configuration provided to them. Worker Framework supports it by providing the ConfigurationSource interface. It has a single method, getConfiguration, that can be used by a worker to retrieve a particular configuration type. Configuration type is identified by a Java class. ConfigurationSource will retrieve a binary representation of this class from underlying source and de-serialize it. Out-of-box implementation provided with the framework is:

To use a different source, developers are required to provide an implementation of the ManagedConfigurationSource and ConfigurationSourceProvider.

Encryption of Configuration

Configuration data can be encrypted. This is achieved by the use of Cipher. By default, the worker-core application will use the NullCipher implementation which does not encrypt or decrypt any data. Framework also includes the JasyptCipher which provides basic text encryption. Developers can implement their own encryption mechanism by providing an implementation of the Cipher and the CipherProvider.

Decoding Configuration Files

The way in which Configuration files are read may be further controlled through the use of a specific Decoder. These can be used to add additional parsing functionality to the contents of configuration. The decoder used is controlled by the CAF_CONFIG_DECODER environment variable. If this is not specified then the Codec supplied will be used instead to read the contents of configuration.

An example of using a decoder for additional parsing functionality is the JavaScript Decoder which adds supports for reading configuration written in JavaScript files. The decoder also provides a method getenv which can be used in the configuration definition to access environment variables when the configuration is read.

e.g.

({
    name: getenv("MY_STRING") || "Default name"
});

In the above example, the name property will be either the value of the environment variable MY_STRING, or if that environment variable is not present at the runtime then the name property will be set to “Default name”.

Developers can create and use their own decoder by providing an implementation of the Decoder and providing the name of the implementation class in the environment variable CAF_CONFIG_DECODER.

e.g. To use the JavaScript decoder, CAF_CONFIG_DECODER should be set to JavascriptDecoder.

Serialization

The framework operates on messages which carry a payload with all information required for processig. This information needs to be serialized and de-serialized before it can be used by worker-core or a Worker. Worker Framework takes care of that internally using the Codec interface. This interface provides format-agnostic methods for serializing and deserializing of an object. There are following codecs provided with the framework:

  • JsonCodec for the JSON format
  • JsonLzfCodec for the JSON format compressed with a high-speed LZF algorithm

To use a different format, developer has to implement the Codec and `CodecProvider