Skip to content

alemca Module

The alemca module allows interaction with the Alemca service. It provides various functions to write messages, compress data, manage connections, and handle commands.

To import it, use the following declaration:

local alemca = require("alemca")

Configuration

The alemca module requires specific configuration to function. This configuration is defined in the agent's configuration file.

Here is an example with all possible options:

iot:
  alemca: # connection configuration for alemca
    max_cache_size: 1024 # maximum cache size in bytes
    destinations: # message destinations
      - name: "destination_name" # destination name
        exchange: "destination_name" # exchange name
        routing_key: "destination_name" # routing key
        queue_not_durable: false # non-durable queue
        exchange_type: "direct" # exchange type

Configuration Details

  • max_cache_size: Maximum cache size in bytes. If the cache exceeds this size, the oldest messages will be deleted.
  • destinations: List of message destinations. Each destination must have a name, exchange, routing key, and other options like queue durability and exchange type.
  • name: Name of the destination. Used to identify it in Lua code.
  • exchange: Exchange name. Used to publish messages.
  • routing_key: Routing key. Used to direct messages to the correct queue.
  • queue_not_durable: Indicates whether the queue is non-durable. If true, it won’t persist after a restart.
  • exchange_type: Exchange type. Can be direct, fanout, topic, or headers. Defines how messages are routed to queues.

Lua Functions

Function Table

Function Lua Signature Role Return (success) Return (failure)
write bool, err = alemca.write(routingKey, payload [, headers]) Immediately publishes payload to the queue routingKey. true, nil nil, "msg"
writebatch bool, err = alemca.writebatch(payload) Adds payload to disk cache amqp.cache. true, nil nil, "msg"
sendbatch bool, err = alemca.sendbatch(routingKey [, unique=false [, compress=false [, headers]]]) Flushes the cache to routingKey. unique=true sends line by line. compress=true applies gzip. true, nil nil, "msg"
compress compressed, err = alemca.compress(str) Returns str compressed (gzip + base64). string, nil nil, "msg"
status connected, err = alemca.status() Checks the AMQP connection. true, nil false, "msg"
getbatch len, err = alemca.getbatch() Number of lines in amqp.cache. n, nil nil, "msg"
clearbatch bool, err = alemca.clearbatch() Purges the cache. true, nil nil, "msg"
read msg, err = alemca.read(queueName) Blocks up to 5 seconds, returns next message from queueName. string, nil nil, "msg"
writemetrics err = alemca.writemetrics(metrics [, types [, units [, tags [, ident]]]]) Converts metrics table to JSON and writes it to metrics.cache. nil "msg"
pushmetrics err = alemca.pushmetrics() Sends metrics.tmp content to data_metrics_ex. nil "msg"
newcommand bool, err = alemca.newcommand(cmdTable) Schedules delayed execution of a remote (Lua) script. true, nil nil, "msg"
get_entity entity, err = alemca.get_entity([ident]) Retrieves the Core entity. luaTable, nil nil, "msg"
get_children children, err = alemca.get_children([ident]) Recursively lists child entities. luaTable, nil nil, "msg"
Fields alemca.ident, alemca.token, alemca.name, alemca.type Direct access to loaded metadata.

alemca.write(routing_key, message[, headers])

Publishes a message to Alemca.

  • Parameters:
  • routing_key (string): Routing key for the message.
  • message (string): Message to send.
  • headers (table, optional): Optional additional headers.

  • Return:

  • true on success, otherwise returns an error.
local ok, err = alemca.write("uc.raw.temp", '{"t":25.4,"u":"°C"}')
assert(ok, err)

alemca.writebatch(message)

Adds a message to the batch cache.

  • Parameters:
  • message (string): Message to add to the batch.

  • Return:

  • true on success
alemca.writebatch('{"ts":1680000000,"v":12}')
alemca.writebatch('{"ts":1680000005,"v":13}')

-- later …
local ok, err = alemca.sendbatch("uc.raw.series", false, true)
assert(ok, err)

alemca.sendbatch(routing_key[, send_mode[, compress_mode[, headers]]])

Sends a batch of messages to Alemca.

alemca.writebatch('{"ts":1680000000,"v":12}')
alemca.writebatch('{"ts":1680000005,"v":13}')

-- later …
local ok, err = alemca.sendbatch("uc.raw.series", false, true)
assert(ok, err)

alemca.compress(message)

Compresses a message using GZIP and encodes it in base64.

  • Parameters:
  • message (string): The message to compress.

  • Return:

  • compressed (string): The compressed and base64-encoded message.
  • err (string or nil): Error message if compression fails.

alemca.status()

Checks the current connection status with the Alemca AMQP service.

  • Parameters: None

  • Return:

  • connected (boolean): true if connected, false otherwise.
  • err (string or nil): Error message if not connected.

alemca.getbatch()

Returns the number of messages currently stored in the local disk cache.

  • Parameters: None

  • Return:

  • len (number): Number of lines/messages in the cache.
  • err (string or nil): Error message if retrieval fails.

alemca.clearbatch()

Clears all messages from the local disk cache.

  • Parameters: None

  • Return:

  • ok (boolean): true if successful.
  • err (string or nil): Error message if the operation fails.

alemca.read(routing_key)

Reads a message from Alemca.

local msg, err = alemca.read("alerts")
if msg then
  print("ALERT:", msg)
else
  print("no alert:", err)
end

alemca.writemetrics(metrics[, types[, units[, tags[, ident]]]])

Writes metrics to the cache.

  • Parameters:
  • metrics (table): Table containing metrics to write.
  • types (table, optional): Table specifying the types of each metric.
  • units (table, optional): Table specifying the units of each metric.
  • tags (table, optional): Table specifying tags for the metrics.
  • ident (string, optional): Identifier for the metrics.

  • Return:

  • err (string or nil): Error message if the operation fails.
  • nil on success.
alemca.writemetrics(
  { cpu = 17.3, heap = 11264 },
  { cpu = "float", heap = "int" },
  { cpu = "percent", heap = "kB" },
  { site = "press-01", env = "prod" }
)

alemca.pushmetrics()

Forces a flush of metrics from the temporary cache to the Alemca service.

  • Parameters: None

  • Return:

  • err (string or nil): nil on success, or an error message if the operation fails.

alemca.newcommand(command)

Creates a new command for Alemca.

  • Parameters:
  • command (table): Table containing command details.

    • script (string): Name of the script to execute.
    • job_ident (string): Unique identifier for the job.
    • variables (table, optional): Variables to pass to the script.
    • constants (table, optional): Constants to pass to the script.
    • attributes (table, optional): Attributes for the command.
    • scheduled_time (number, optional): Scheduled execution time (in seconds since epoch).
    • expire_time (number, optional): Expiration time (in seconds since epoch).
    • cancel (string, optional): Identifier for canceling the command.
  • Return:

  • ok (boolean): true if the command was created successfully.
  • err (string or nil): Error message if the operation fails.
local cmd = {
  script = "reboot.lua",
  job_ident = "reboot-2025-04-30-02h",
  variables = { delay = 10 },
  constants = {},
  attributes = { reason = "nightly-maintenance" },
  scheduled_time = os.time() + 3600,   -- in 1 hour
  expire_time = os.time() + 86400,     -- expires in 24 hours
  cancel = ""
}

local ok, err = alemca.newcommand(cmd)
assert(ok, err)

alemca.get_entity([ident])

Fetches the Core entity.

  • Parameters:
  • ident (string, optional): Identifier of the entity to fetch. If not provided, the default entity is returned.

  • Return:

  • entity (table): Table containing entity details.
    • ident (string): Identifier of the entity.
    • name (string): Name of the entity.
    • type (string): Type of the entity.
    • attributes (table): Attributes of the entity.
    • tags (table): Tags associated with the entity.
local entity, err = alemca.get_entity("my_entity")
if entity then
  print("Entity:", entity.name)
else
  print("Error:", err)
end

alemca.get_children([ident])

Fetches children of the Core entity.

  • Parameters:
  • ident (string, optional): Identifier of the entity to fetch children from. If not provided, the default entity is used.

  • Return:

  • children (table): Table containing child entities.
    • Each child entity has the same structure as the entity returned by get_entity().
  • err (string or nil): Error message if the operation fails.
local children, err = alemca.get_children("my_entity")

if children then
  for _, child in ipairs(children) do
    print("Child:", child.name)
  end
else
  print("Error:", err)
end

Associated Lua Fields

  • alemca.ident: Returns the Alemca instance identifier.
  • alemca.token: Returns the Alemca instance token.
  • alemca.name: Returns the Alemca instance name.
  • alemca.type: Returns the Alemca instance type.