---
parent: /workflows/agents.md
back_out_url: /workflows/agents.md
---

# Workflow 16: Build an Analytics Pipeline (Processors)

> You are at https://api.litmus.io/workflows/build-analytics-pipeline.md
> Parent: https://api.litmus.io/workflows/agents.md
> If this is the wrong workflow, back out to the index above.

## 16. Build an Analytics Pipeline

**UI trigger**: Analytics -> open a group -> drag processors onto the canvas -> wire them -> Save.

The Analytics module runs **pipelines of processors**. Each processor is one node. Data flows along **wires** between nodes. The whole module is GraphQL at `POST {{edgeUrl}}/analytics/v3`.

---

## THE ONE RULE (read this before creating anything)

**A pipeline is always SOURCE -> TRANSFORM(s) -> SINK, wired together.**

A single transform processor (JSONata, Expression, Moving Average, a KPI, ...) does **nothing** on its own. It has no data coming in and nowhere to send the result. This is the single most common mistake: creating a lone `JSONata` or `Expression` node with no input and no output.

Minimum viable pipeline = **3 processors + 2 wires**:

```
[SOURCE]  --values-->  [TRANSFORM]  --values-->  [SINK]
```

If you only need to move data with no math, you still need a SOURCE and a SINK (e.g. `DataHub Subscribe -> DataHub Publish`).

### Exceptions: nodes that can run without an input wire

Some processors do not need an upstream wire. There are **two** mechanisms, both visible in the processor's `Get All (Supported) Processors Metadata` entry. Check for them before assuming a node needs an input.

**1. `Invoke` feature** -- `Features` array contains `Invoke`. Driven on demand (manually or scheduled) through the `Invoke Processor` endpoint, so it can be the first node or a standalone node. Live-verified set in LE 4.0.x:

| Processor | Role | On invoke it... |
|---|---|---|
| `Inject` | SOURCE (`outputOnly`) | Emits a fixed payload |
| `Database Batch Input` | SOURCE (`outputOnly`) | Pulls a batch of rows from the historian |
| `FileReader` | TRANSFORM (`inputOutput`) | Reads a file |
| `Delay/Rate-Limit` | TRANSFORM (`inputOutput`) | Releases buffered data on a schedule/trigger |
| `AI Responses` | TRANSFORM (`inputOutput`) | Calls the configured AI model on demand |

**2. Config-supplied data source** -- a parameter that provides the data directly, making the input wire optional. Read the parameter labels for an "leave empty to use input" style hint. Live example:

| Processor | Parameter | Behaviour |
|---|---|---|
| `AI Vision` | `file_path_for_image` | "File Path for the Image. Leave empty to use input message as the image." Set it -> reads the image from disk, no input wire needed. Leave empty -> expects an input message. |

So `AI Vision` works **both** ways: standalone (file path set) or wired (file path empty). Plain `AI` (text) has no such source parameter, so it needs an input message. Note also that the AI processors expose a `timerInterval` parameter that pushes output on a timer.

The takeaway: do not infer "needs input" from `ConnectDirection` alone. Check `Features` for `Invoke` AND scan the `Parameters` for a config-supplied source. Everything without either still follows THE ONE RULE.

---

## How to tell a processor's role: `ConnectDirection`

Every processor in `Get All (Supported) Processors Metadata` carries a `ConnectDirection` field. That field, not the name, tells you the role:

| `ConnectDirection` | Role | Meaning | Use it as |
|---|---|---|---|
| `outputOnly` | **SOURCE** | Produces data, accepts no input wire | First node in the pipeline |
| `inputOnly` | **SINK** | Consumes data, produces no output wire | Last node in the pipeline |
| `inputOutput` | **TRANSFORM** | Both: takes input, emits output | The middle (one or many) |

### The actual catalog (LE 4.0.x, 56 processors, verified live against `GetProcessorsMetadata`)

| Role | Processors |
|---|---|
| **SOURCE** (`outputOnly`) | `DataHub Subscribe`, `Generator`, `Inject`, `Database Batch Input` |
| **SINK** (`inputOnly`) | `DataHub Publish`, `Database Output`, `StdOut`, `FileWriter`, `ImagePreview` |
| **TRANSFORM** (`inputOutput`) | everything else (47): reshape/script (`JSONata`, `Expression`, `JavaScript processor`, `Tengo script processor`, `Conversion`, `Base Conversion`, `Rounding`, `Switch`, `Delay/Rate-Limit`), windows/stats (`Moving Average/Maximum/Minimum`, `Simple Window`, `Statistical Functions`, `Gaussian Filter`, `Normalization`, `Signal Decomposition`, `Feature Extractor`), multi-input combine (`Combination Processor`, `Join Processor`, `Inputs Sum/Average/Maximum/Minimum`, `Change`, `Change of Value`), predict/anomaly (`Anomaly Detection`, `ARIMA Filter`, `Linear Prediction`, `Statistical Prediction`, `SPC Charts`, `XMR Chart`, `Rise and Fall`), KPI/OEE (`Production Time/ CTR`, `Manufacture Counter`, `Up/DownTime by Value`, `Asset Online Percentage`, `Online and Offline`, `Compliance and Loss`, `Maintenance/Failure`, `PLC Uptime/Downtime Tracker`), AI/ML (`AI`, `AI Vision`, `AI Responses`, `TensorFlow processor`, `TensorFlow images processor`) |

> The catalog is device-specific. Call `Get All (Supported) Processors Metadata` to get the live list and each processor's exact `Parameters`, defaults, and enums. Do not assume a processor exists; confirm it in the metadata response.

---

## What to use when (picker)

| You want to... | Use (role) |
|---|---|
| Subscribe to a DataHub / MQTT topic | `DataHub Subscribe` (SOURCE) |
| Read rows from the historian | `Database Batch Input` (SOURCE) |
| Generate test data | `Generator` or `Inject` (SOURCE) |
| Extract / rename / reshape one or more fields | `JSONata` or `Expression` (TRANSFORM) |
| Run custom logic | `JavaScript processor` or `Tengo script processor` (TRANSFORM) |
| Smooth / window / aggregate | `Moving Average`, `Simple Window`, `Statistical Functions` (TRANSFORM) |
| Detect anomalies / control charts | `Anomaly Detection`, `SPC Charts`, `XMR Chart` (TRANSFORM) |
| Forecast | `ARIMA Filter`, `Linear Prediction`, `Statistical Prediction` (TRANSFORM) |
| Compute OEE / uptime / counts | KPI group: `Up/DownTime by Value`, `Online and Offline`, `Manufacture Counter`, `Production Time/ CTR` (TRANSFORM) |
| Publish a result to a topic | `DataHub Publish` (SINK) |
| Persist to the historian | `Database Output` (SINK) |

---

## Build procedure (the API calls)

| Step | API Name | Operation | Notes |
|------|----------|-----------|-------|
| 0 | Get All (Supported) Processors Metadata | `GetProcessorsMetadata` | Discover each processor's `Parameters` (keys, types, defaults, enums). Cache it. For a hand-curated snapshot of all 56 LE 4.0.x processors with description, params, and required wire definitions, see /reference/analytics-processors.md; still call live metadata for exact defaults and enums. |
| 1 | Create New Processor (SOURCE) | `CreateProcessorInstance` | `Function` = source name, `Parameters` = its config (e.g. topic). Capture returned `ID`. |
| 2 | Create New Processor (TRANSFORM) | `CreateProcessorInstance` | One per transform. Capture each `ID`. |
| 3 | Create New Processor (SINK) | `CreateProcessorInstance` | Capture `ID`. |
| 4 | Connect Processor Instances | `ConnectProcessorInstances` | `Inputs: [upstream/producer IDs]`, `Outputs: [downstream/consumer IDs]` -- **the fields are the reverse of what the names suggest**: `Inputs` is where the data comes FROM, `Outputs` is where it goes TO. `ConnectionType` is **plural** (`"values"`/`"events"`; singular is rejected). A wire feeding a SINK (`DataHub Publish`, `Database Output`, ...) must be `"events"`; a source -> transform wire may be `"values"` or `"events"`. One call per wire (or batch). |
| 5 | Set Processor Wire Definition (optional) | `DefineProcessorInstanceInput` | Only when a transform needs to know **which field** of the upstream output to read. `Id` = downstream, `InputId` = upstream. |

Notes:
- `CreateProcessorInstance` accepts `ValueInputs` / `EventInputs` / `Outputs` arrays, so you **can** pre-wire at create time instead of calling Connect. Either approach is valid; Connect is clearer step-by-step.
- Wires are typed and the type is **plural**: `values` or `events` (the API rejects singular `value`/`event`). A wire whose consumer is a SINK (`DataHub Publish`, `Database Output`, `StdOut`, ...) must be `events` -- the publish/output side is **never** `values`. A source -> transform wire may be `values` or `events`.
- `Connect Processor Instances` `Inputs`/`Outputs` are **reversed** from intuition: `Inputs` lists the upstream producer(s) (the data source), `Outputs` lists the downstream consumer(s) (the destination). Verified live: to wire `source -> transform`, send `Inputs: [source]`, `Outputs: [transform]`.
- `Active: true` starts the processor immediately; `false` creates it stopped.

---

## Recipe A: Subscribe to a topic, publish one key

Task: "Subscribe to topic `ABC` and publish only one field out of it."

Pipeline: `DataHub Subscribe(ABC) -> JSONata(extract field) -> DataHub Publish(out)`

| Step | Processor | Function | Key Parameters |
|------|-----------|----------|----------------|
| 1 | SOURCE | `DataHub Subscribe` | `topic = ABC` |
| 2 | TRANSFORM | `JSONata` | object-construction over the **flat** DataHub message, e.g. `{"value": value}` to keep just the `value` field. The incoming message has no `payload` wrapper -- its fields are `value`, `tagName`, `deviceName`, `timestamp`, `datatype`, ... Emit an object (not a bare scalar). Use `Expression` for a math/field op. |
| 3 | SINK | `DataHub Publish` | `topic = ABC.temperature` (NATS topics use `.`, not `/`) |
| 4 | wire | Connect | subscribe -> jsonata: `values` (or `events`); jsonata -> publish: `events` (sink wires are always events). Per call: `Inputs` = upstream node, `Outputs` = downstream node. |

Without steps 1 and 3 the JSONata node receives nothing and publishes nothing. This is the failure mode this page exists to prevent.

---

## Recipe B: Compute OEE

OEE = Availability x Performance x Quality. Each factor is its own sub-chain that ends in an `Expression` that multiplies the three.

Pipeline (one common shape):

```
DataHub Subscribe(machine state) --> Up/DownTime by Value ----\
DataHub Subscribe(part counts)   --> Manufacture Counter -----> Expression(A*P*Q) --> DataHub Publish(oee)
DataHub Subscribe(quality)       --> Compliance and Loss ----/                     \-> Database Output
```

| Step | Processor | Function | Purpose |
|------|-----------|----------|---------|
| 1a-1c | SOURCE x3 | `DataHub Subscribe` | One per data feed: machine state, part counts, quality flags |
| 2a | TRANSFORM | `Up/DownTime by Value` or `Online and Offline` | Availability factor |
| 2b | TRANSFORM | `Manufacture Counter` or `Production Time/ CTR` | Performance factor |
| 2c | TRANSFORM | `Compliance and Loss` (or `Expression`) | Quality factor |
| 3 | TRANSFORM | `Expression` | Multiply the three factors into the OEE value |
| 4 | SINK | `DataHub Publish` + `Database Output` | Publish live OEE and persist history |
| 5 | wire | Connect | source -> KPI and KPI -> Expression: `values` (or `events`); Expression -> each sink (`DataHub Publish` / `Database Output`): `events` |

Tip: put OEE in its own group (`Get All Groups` / create per group) so the line's processors stay isolated.

---

## Recipe C: Anomaly detection / SPC

Task: "Watch a sensor topic and flag anomalies / chart it."

Pipeline: `DataHub Subscribe -> Anomaly Detection (or SPC Charts / XMR Chart) -> DataHub Publish`

| Step | Processor | Function | Key Parameters |
|------|-----------|----------|----------------|
| 1 | SOURCE | `DataHub Subscribe` | `topic` |
| 2 | TRANSFORM | `Anomaly Detection` | `window size`, `deviations`, control-chart mode (see metadata) |
| 3 | SINK | `DataHub Publish` | output topic for the flag/score |
| 4 | wire | Connect | subscribe -> detector: `values` (or `events`); detector -> publish: `events` |

Swap `Anomaly Detection` for `SPC Charts` or `XMR Chart` for statistical process control. Same SOURCE/SINK requirement applies.

---

## One-shot alternative: author the whole group as JSON and Import

Instead of N `Create` + `Connect` calls, you can describe the entire pipeline as one `BackupItem` JSON and load it with **`Import Group`**. This is the best path for an LLM: emit one self-consistent graph, import it, done. To learn the exact shape, `Export Group` an existing group and copy the structure. All findings below are verified live on LE analytics 2.1.2.

- `Export Group`: `query ExportGroup($input: String!)` -- `input` = group name. Returns the `BackupItem` envelope.
- `Import Group`: `mutation ImportGroup($groupName: String!, $data: String!)` -- `data` is the `BackupItem` envelope **stringified** (JSON-encoded as a string).

**How Import behaves (verified):**

1. **Additive, per-group.** It creates the named group if new, replaces it if it already exists. Other groups are untouched -- it is NOT a full-state wipe.
2. **Placement is via `Positions`.** Every processor needs a `Positions[]` entry with `GroupName` = the target group, and pass the same `groupName` argument. With `Positions: []` the nodes fall into `default` regardless of `groupName`.
3. **Use fresh processor IDs** for a new group. Re-importing the *same* IDs into an *existing* group can fail with `errors during processor creation`. The IDs only need to be internally consistent (the references between processors must match).

### Envelope

```json
{
  "Processors": [ /* one object per node, see below */ ],
  "Positions":  [ { "ID": "<id>", "Top": 100, "Left": 20, "GroupName": "<group>" } ],
  "Version": "2.1.2",
  "Git": "",
  "AIModels":   [],
  "Variables":  [],
  "Parameters": []
}
```

`AIModels`/`Variables`/`Parameters` are usually `[]` (AI model API keys are never exported; re-add after import).

### Each processor

```json
{
  "ID": "<a UUID you assign>",
  "Name": "<display name>",
  "FunctionName": "<exact catalog name, e.g. JSONata>",
  "IsActive": true,
  "Config": { "settings": { "<paramKey>": "<value>" } },
  "InputEvents": [],
  "InputWaits": [],
  "InputsDefinitions": {},
  "State": null,
  "Outputs": []
}
```

- `FunctionName` must match the catalog exactly (from `GetProcessorsMetadata`).
- `Config.settings` keys are the processor's `Parameters` keys (e.g. `DataHub Subscribe` -> `topic`; `JSONata` -> `jsonata_schema`; `DataHub Publish` -> `topic`).

### Wiring rule (verified live -- this is the part bots get wrong)

Every wire is **mirrored on BOTH sides**. For a wire producer -> consumer:

- **Producer:** add the consumer's ID to the producer's `Outputs`. This is **required** -- a wire with only the consumer side set does not connect.
- **Consumer:** add the producer's ID to the consumer's `InputEvents` (event wires -- every wire feeding a SINK/publish is an event wire) **or** `InputWaits` (value/synchronized-combine inputs, e.g. into `Inputs Average`). `InputEvents` is the common case.
- **`InputsDefinitions`** = `{ "<slot>": "<producer ID>" }` only for processors that reference an input **by name** (e.g. `Expression` whose formula uses a `Celsius` variable -> `{ "Celsius": "<producer>" }`; multi-input KPIs like `Compliance and Loss` -> `manufactureEnd`, `defectiveUnit`). Ordinary single-input nodes (`JSONata`, `DataHub Publish`, `Database Output`) keep `InputsDefinitions: {}` -- the wire is just `Outputs` + `InputEvents`.

Setting only `InputsDefinitions` (or only one side) imports with **no functional wire**. `Export Group` round-trips whatever you send, so a bad payload can look wired in the export yet show disconnected nodes in the UI -- verify in the UI or by data flow, not by re-export.

### JSONata expressions

The input is the **flat** DataHub message (fields `value`, `tagName`, `deviceName`, `timestamp`, `datatype`, ... -- no `payload` wrapper). A JSONata schema is an **object construction**, not a path. To keep just one field as the pipeline value: `{"value": value}` (reads the top-level `value`, emits `{"value": ...}`). Do **not** write `payload.temperature` -- there is no `payload`, and a bare scalar is not a usable downstream message.

Full message shape, field types, and a JSONata cheatsheet: /reference/devicehub-message-format.md

### Verified working template (Subscribe -> JSONata -> Publish)

This exact document imports clean and produces a correctly wired, running pipeline (LE analytics 2.1.2):

```json
{
  "Processors": [
    { "ID": "11111111-1111-1111-1111-111111111111", "Name": "source", "FunctionName": "DataHub Subscribe", "IsActive": true,
      "Config": { "settings": { "topic": "devicehub.alias.<device>.<tag>" } },
      "InputEvents": [], "InputWaits": [], "InputsDefinitions": {}, "State": null, "Outputs": ["22222222-2222-2222-2222-222222222222"] },
    { "ID": "22222222-2222-2222-2222-222222222222", "Name": "extract-one-key", "FunctionName": "JSONata", "IsActive": true,
      "Config": { "settings": { "jsonata_schema": "{\"value\": value}" } },
      "InputEvents": ["11111111-1111-1111-1111-111111111111"], "InputWaits": [], "InputsDefinitions": {}, "State": null, "Outputs": ["33333333-3333-3333-3333-333333333333"] },
    { "ID": "33333333-3333-3333-3333-333333333333", "Name": "publish", "FunctionName": "DataHub Publish", "IsActive": true,
      "Config": { "settings": { "single_topic": true, "topic": "ABC.temperature" } },
      "InputEvents": ["22222222-2222-2222-2222-222222222222"], "InputWaits": [], "InputsDefinitions": {}, "State": null, "Outputs": [] }
  ],
  "Positions": [
    { "ID": "11111111-1111-1111-1111-111111111111", "Top": 16,  "Left": 0, "GroupName": "My Pipeline" },
    { "ID": "22222222-2222-2222-2222-222222222222", "Top": 120, "Left": 0, "GroupName": "My Pipeline" },
    { "ID": "33333333-3333-3333-3333-333333333333", "Top": 220, "Left": 0, "GroupName": "My Pipeline" }
  ],
  "Version": "2.1.2", "Git": "", "AIModels": [], "Variables": [], "Parameters": []
}
```

Note the mirroring: `source.Outputs` -> jsonata, `jsonata.InputEvents` -> source and `jsonata.Outputs` -> publish, `publish.InputEvents` -> jsonata. Set the source `topic` to a real DataHub topic (DeviceHub tag topics look like `devicehub.alias.<device>.<tag>`).

Send it: `ImportGroup(groupName: "My Pipeline", data: "<the JSON above, stringified>")`. Then verify in the UI, or with `Get Processor Instances` / `Get Processor Last Value`.

---

## Verify

| Check | API Name | Operation |
|------|----------|-----------|
| Processor exists and is active | Get Processor Instances | `GetProcessorInstances` (input = group) |
| Last value flowing | Get Processor Last Value | `GetProcessorLastValue` |
| Errors on a node | Get Processor Last Error | `GetProcessorLastError` |
| Live output | Subscribe to Processor Output | streaming |

If `Get Processor Last Value` on the SINK is empty, the wires are missing or pointing the wrong way. Remember `Connect Processor Instances` is reversed: `Inputs` = the upstream producer(s), `Outputs` = the downstream consumer(s).

---

## Back out

Single endpoint detail (params, GraphQL fields) -> /le/analytics.md. Other workflows -> /workflows/agents.md.
