IBM i and Apache Kafka in 2026: Journal-Based CDC to Kafka Topics, PASE Producers, and Event-Driven IBM i Integration

The previous post covered IBM i display files and 5250 UX modernisation — DDS syntax, subfile programming in RPG, indicator handling, and the options for replacing or wrapping green-screen interfaces with modern web front-ends. This post covers connecting IBM i to Apache Kafka for real-time event streaming: why Kafka matters for IBM i integration, running producers and consumers in PASE, journal-based change data capture as a reliable CDC source, the IBM MQ bridge pattern for Kafka Connect, and practical architecture guidance for IBM i as a Kafka participant.

Why Kafka Matters for IBM i

IBM i has always been excellent at storing and processing transactional data — orders, shipments, financial postings, inventory movements. The challenge in 2026 is that this data needs to reach downstream consumers in real time: microservices running in Kubernetes, data lakes in S3 or Azure Data Lake, analytics pipelines in Spark or Databricks, and mobile applications that expect live order status.

Traditionally, IBM i integration relied on scheduled batch extracts, FTP file drops, or direct DB2 queries from external systems. These approaches are fragile, laggy, and tightly coupled — if the downstream system changes its query pattern, the IBM i is impacted. Apache Kafka solves this with a durable, replayable event log. IBM i becomes a producer of events (order created, shipment despatched, inventory adjusted), and any number of downstream consumers subscribe independently. Neither side needs to know about the other’s availability or schema evolution at the point of data production.

The decoupling benefit is significant: IBM i batch windows stop being a bottleneck, downstream systems can consume at their own pace, and the Kafka topic log provides a built-in audit trail that replays events for new consumers joining months later.

Kafka Concepts Recap

A brief orientation for IBM i practitioners who are new to Kafka:

  • Topic — a named, ordered, durable log of messages. Analogous to a DB2 physical file, but append-only and distributed. Example: ibmi.orders.created.
  • Producer — a client that appends messages to a topic. Your IBM i PASE process is a producer.
  • Consumer — a client that reads messages from a topic, tracking its position with an offset. Consumers are stateless with respect to the broker.
  • Broker — a Kafka server node that stores topic partitions. A Kafka cluster has one or more brokers.
  • Consumer group — a set of consumers that share partition assignments. Each partition is consumed by exactly one member of a group at a time, enabling parallel processing.
  • Partition — a topic is split into N partitions for parallelism and throughput. Messages within a partition are strictly ordered; across partitions, order is not guaranteed.
  • Offset — the sequential position of a message within a partition. Consumers commit offsets to track progress; on restart, they resume from the last committed offset.

For IBM i integration, a topic like ibmi.orders.created with 6 partitions, keyed by customer number, ensures that all events for a given customer land on the same partition (preserving per-customer ordering) while allowing 6-way parallel consumption downstream.

Running Kafka Clients in PASE

IBM i PASE (Portable Application Solutions Environment) provides a full AIX/Linux-compatible runtime on IBM i. Node.js and Python run natively in PASE, and both have mature Kafka client libraries. Install them from an SSH session to the IBM i PASE shell:

# Install Node.js (if not already present via IBM i Open Source Packages)
yum install nodejs20

# Install kafkajs — pure JavaScript Kafka client, no native dependencies
npm install kafkajs

# Install Python 3 and confluent-kafka
yum install python39
pip3 install confluent-kafka

Verify the PASE environment can reach your Kafka broker (adjust the broker hostname and port as appropriate):

# Test TCP connectivity from PASE to the Kafka broker
/QOpenSys/usr/bin/telnet kafka-broker.internal 9092

# Or use nc (netcat) if available
nc -zv kafka-broker.internal 9092

If your Kafka cluster requires TLS and SASL/SCRAM authentication (standard for Confluent Cloud and most enterprise Kafka deployments), you will need the CA certificate chain available in the PASE filesystem:

# Download the Confluent Cloud CA bundle to the PASE IFS
curl -o /home/kafkauser/ca-cert.pem 
  https://curl.se/ca/cacert.pem

Producing IBM i DB2 Changes to Kafka — Polling Approach

The simplest producer pattern polls a DB2 for i table at a regular interval, identifies rows modified since the last run (using a timestamp column or an auto-increment sequence), and produces each changed row as a JSON message to a Kafka topic. This is the polling CDC approach — not as reliable as journal-based CDC, but quick to implement.

Create a timestamp column on the order header table if one does not exist:

-- Add a row-change timestamp column to ORDHDR
ALTER TABLE ORDLIB.ORDHDR
  ADD COLUMN OHCHGTS TIMESTAMP
  GENERATED ALWAYS FOR EACH ROW ON UPDATE AS ROW CHANGE TIMESTAMP;

-- Create an index to make the polling query efficient
CREATE INDEX ORDLIB.ORDHDR_CHGTS
  ON ORDLIB.ORDHDR (OHCHGTS);

The Node.js producer script, saved to /home/kafkauser/ibmi-order-producer.js:

'use strict';
const { Kafka, Partitioners } = require('kafkajs');
const odbc  = require('odbc');
const fs    = require('fs');
const path  = require('path');

// ── Configuration ─────────────────────────────────────────────────
const KAFKA_BROKERS   = ['kafka-broker.internal:9093'];
const KAFKA_TOPIC     = 'ibmi.orders.created';
const DB_CONN         = 'DSN=IBMI_PROD;UID=KAFKAUSER;PWD=secret';
const STATE_FILE      = '/home/kafkauser/.last_poll_ts';
const POLL_INTERVAL   = 30000; // 30 seconds

const kafka = new Kafka({
  clientId: 'ibmi-order-producer',
  brokers:  KAFKA_BROKERS,
  ssl: {
    ca: [fs.readFileSync('/home/kafkauser/ca-cert.pem')],
  },
  sasl: {
    mechanism: 'scram-sha-256',
    username:  process.env.KAFKA_USER,
    password:  process.env.KAFKA_PASS,
  },
});

const producer = kafka.producer({
  createPartitioner: Partitioners.DefaultPartitioner,
});

// ── State management — persist last polled timestamp ─────────────
function readLastTs() {
  try {
    return fs.readFileSync(STATE_FILE, 'utf8').trim();
  } catch {
    return '2000-01-01 00:00:00.000000'; // Bootstrap: all history
  }
}

function writeLastTs(ts) {
  fs.writeFileSync(STATE_FILE, ts, 'utf8');
}

// ── Main poll loop ────────────────────────────────────────────────
async function pollAndProduce() {
  let conn;
  try {
    conn = await odbc.connect(DB_CONN);
    const lastTs = readLastTs();

    const rows = await conn.query(
      `SELECT OHORDNO, OHCUST, OHORDT, OHAMT, OHSTAT,
              VARCHAR_FORMAT(OHCHGTS, 'YYYY-MM-DD HH24:MI:SS.FFFFFF') AS CHG_TS
         FROM ORDLIB.ORDHDR
        WHERE OHCHGTS > TIMESTAMP(?)
        ORDER BY OHCHGTS
        FETCH FIRST 500 ROWS ONLY`,
      [lastTs]
    );

    if (rows.length === 0) {
      console.log(`[${new Date().toISOString()}] No new rows since ${lastTs}`);
      return;
    }

    const messages = rows.map(row => ({
      key:   String(row.OHCUST).trim(),   // Partition by customer number
      value: JSON.stringify({
        eventType:      'ORDER_CHANGED',
        orderNumber:    row.OHORDNO,
        customerNumber: row.OHCUST,
        orderDate:      row.OHORDT,
        orderAmount:    parseFloat(row.OHAMT),
        orderStatus:    row.OHSTAT.trim(),
        changedAt:      row.CHG_TS,
        source:         'ibmi-ordlib',
      }),
      headers: {
        'content-type': 'application/json',
        'source-system': 'IBM_I',
      },
    }));

    await producer.send({ topic: KAFKA_TOPIC, messages });

    const newLastTs = rows[rows.length - 1].CHG_TS;
    writeLastTs(newLastTs);
    console.log(`[${new Date().toISOString()}] Produced ${rows.length} messages. Last TS: ${newLastTs}`);

  } catch (err) {
    console.error('Poll error:', err.message);
  } finally {
    if (conn) await conn.close();
  }
}

async function run() {
  await producer.connect();
  console.log('Producer connected to Kafka.');
  await pollAndProduce();
  setInterval(pollAndProduce, POLL_INTERVAL);
}

run().catch(err => {
  console.error('Fatal:', err.message);
  process.exit(1);
});

Run this as a persistent PASE process, managed by a simple shell wrapper or by nohup:

nohup node /home/kafkauser/ibmi-order-producer.js 
  KAFKA_USER=kafkauser 
  KAFKA_PASS=secret 
  >> /home/kafkauser/producer.log 2>&1 &

Journal-Based CDC — The Reliable Approach

Polling CDC has a fundamental weakness: rows that are inserted and deleted between poll cycles are invisible. IBM i journals provide a far more reliable CDC source. Every committed change to a journalled physical file or SQL table is recorded in the journal as a journal entry, including the before-image and after-image of the row.

The key journal entry type for data changes is E-JE (Journal Entry type E, subtype J = Before/After image). IBM i records insert (PT), update (UB/UP), and delete (DL) operations in the journal in strict commit sequence.

Query journal entries using the QSYS2.DISPLAY_JOURNAL table function, available on IBM i 7.3 and later with the relevant PTF:

-- Query journal entries for ORDLIB/ORDHDR since a given timestamp
SELECT
    ENTRY_TIMESTAMP,
    JOURNAL_CODE,
    JOURNAL_ENTRY_TYPE,
    OBJECT_LIBRARY,
    OBJECT_NAME,
    CAST(ENTRY_DATA AS VARCHAR(32000)) AS ROW_DATA
FROM TABLE(
    QSYS2.DISPLAY_JOURNAL(
        JOURNAL_LIBRARY  => 'ORDLIB',
        JOURNAL_NAME     => 'ORDJRN',
        STARTING_TIMESTAMP => TIMESTAMP('2026-06-01 00:00:00'),
        JOURNAL_CODES    => 'R',      -- R = record-level changes
        JOURNAL_ENTRY_TYPES => 'PT,UB,UP,DL'  -- Insert, Before, After, Delete
    )
)
WHERE OBJECT_LIBRARY = 'ORDLIB'
  AND OBJECT_NAME    = 'ORDHDR'
ORDER BY ENTRY_TIMESTAMP;

The ENTRY_DATA column contains the raw binary row image. Parsing it requires knowledge of the physical file record layout. For production CDC, it is common to use the DSPJRN command to export journal entries to an output file with formatted field values, then process that output file from PASE:

/* CL program: DUMPJRN — extract journal entries to an output PF */
PGM
  DCLF FILE(QTEMP/JRNOUT)

  DSPJRN    JRN(ORDLIB/ORDJRN)
            RCVRNG(*CURCHAIN)
            FROMTIME(&FROMTS)
            JRNCDE((R))
            ENTTYP((PT)(UP)(DL))
            FILE((ORDLIB/ORDHDR *ALL))
            OUTPUT(*OUTFILE)
            OUTFILE(QTEMP/JRNOUT)
            OUTMBR(*FIRST *REPLACE)
            FMTVAR(*FLDDTA)

  RETURN
ENDPGM

The Node.js journal CDC producer reads from the QTEMP/JRNOUT output file rather than polling the live table, ensuring no changes are missed between cycles:

// journal-cdc-producer.js — CDC from IBM i journal output file
'use strict';
const { Kafka, Partitioners } = require('kafkajs');
const odbc = require('odbc');
const fs   = require('fs');

const KAFKA_BROKERS = ['kafka-broker.internal:9093'];
const KAFKA_TOPIC   = 'ibmi.orders.cdc';
const DB_CONN       = 'DSN=IBMI_PROD;UID=KAFKAUSER;PWD=secret';
const STATE_FILE    = '/home/kafkauser/.last_jrn_seq';

const kafka    = new Kafka({ clientId: 'ibmi-cdc', brokers: KAFKA_BROKERS });
const producer = kafka.producer({ createPartitioner: Partitioners.DefaultPartitioner });

function readLastSeq() {
  try { return BigInt(fs.readFileSync(STATE_FILE, 'utf8').trim()); }
  catch { return 0n; }
}

function writeLastSeq(seq) {
  fs.writeFileSync(STATE_FILE, seq.toString(), 'utf8');
}

async function processJournalEntries() {
  let conn;
  try {
    conn = await odbc.connect(DB_CONN);
    const lastSeq = readLastSeq();

    // Read entries from the journal output file (JRNOUT) created by DUMPJRN CL
    const rows = await conn.query(
      `SELECT JOESD,   -- Entry sequence number
              JOENTL,  -- Entry type (PT=Insert, UP=After-update, DL=Delete)
              JOTSTP,  -- Timestamp
              JOMBR,   -- Member name (= physical file member)
              JOCTRR,  -- Relative record number
              JOESP    -- Entry-specific data (formatted field values)
         FROM QTEMP.JRNOUT
        WHERE DECIMAL(JOESD, 20, 0) > ?
        ORDER BY DECIMAL(JOESD, 20, 0)
        FETCH FIRST 1000 ROWS ONLY`,
      [Number(lastSeq)]
    );

    if (rows.length === 0) return;

    const messages = rows.map(row => {
      const entryType = row.JOENTL ? row.JOENTL.trim() : 'UNKNOWN';
      const eventMap  = { 'PT': 'INSERT', 'UP': 'UPDATE', 'DL': 'DELETE' };
      return {
        key:   String(row.JOCTRR),
        value: JSON.stringify({
          eventType:   eventMap[entryType] || entryType,
          sequenceNo:  row.JOESD,
          timestamp:   row.JOTSTP,
          member:      row.JOMBR ? row.JOMBR.trim() : '',
          relRecNo:    row.JOCTRR,
          rawData:     row.JOESP ? row.JOESP.trim() : '',
          source:      'ibmi-journal-cdc',
        }),
      };
    });

    await producer.send({ topic: KAFKA_TOPIC, messages });
    const newSeq = BigInt(rows[rows.length - 1].JOESD);
    writeLastSeq(newSeq);
    console.log(`CDC: produced ${rows.length} journal entries up to seq ${newSeq}`);

  } finally {
    if (conn) await conn.close();
  }
}

async function run() {
  await producer.connect();
  await processJournalEntries();
  await producer.disconnect();
}

run().catch(err => { console.error(err.message); process.exit(1); });

Kafka Connect for IBM i — The MQ Bridge

For organisations that already run IBM MQ on IBM i, the cleanest Kafka integration path is the IBM MQ source connector for Kafka Connect. IBM MQ has native IBM i support (as part of the MQ for IBM i licensed product), and the Kafka MQ connector is available from IBM’s GitHub repository and the Confluent Hub.

The pipeline is: IBM i RPG/CL → IBM MQ queue → Kafka MQ Source Connector → Kafka topic.

Write a message to an IBM MQ queue from IBM i CL:

/* CL: SENDMQMSG — send an order event to IBM MQ */
PGM       PARM(&ORDNO &CUSTNO &ORDAMT)

  DCL VAR(&ORDNO)   TYPE(*DEC) LEN(7 0)
  DCL VAR(&CUSTNO)  TYPE(*DEC) LEN(6 0)
  DCL VAR(&ORDAMT)  TYPE(*DEC) LEN(11 2)
  DCL VAR(&MSG)     TYPE(*CHAR) LEN(256)

  /* Build a JSON-like message string */
  CHGVAR VAR(&MSG) VALUE('{"orderNo":' *CAT &ORDNO *TCAT +
         ',"custNo":' *CAT &CUSTNO *TCAT +
         ',"amount":' *CAT &ORDAMT *TCAT '}')

  /* Put the message on the MQ queue */
  MQPUT   QMGRNAME('IBMIQMGR')
          QNAME('ORDEREVENT.Q')
          MSGDATA(&MSG)
          MSGLEN(%LEN(&MSG))

ENDPGM

Configure the Kafka MQ Source Connector on the Kafka Connect worker (running on Linux or in a container):

{
  "name": "ibmi-mq-source",
  "config": {
    "connector.class":             "com.ibm.eventstreams.connect.mqsource.MQSourceConnector",
    "tasks.max":                   "1",
    "mq.queue.manager":            "IBMIQMGR",
    "mq.connection.name.list":     "ibmi.example.com(1414)",
    "mq.channel.name":             "KAFKA.SVRCONN",
    "mq.queue":                    "ORDEREVENT.Q",
    "mq.user.name":                "MQKAFKA",
    "mq.password":                 "secret",
    "mq.message.body.jms":        "false",
    "mq.record.builder":          "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder",
    "kafka.topic":                 "ibmi.orders.created",
    "key.converter":               "org.apache.kafka.connect.storage.StringConverter",
    "value.converter":             "org.apache.kafka.connect.storage.StringConverter"
  }
}

Deploy the connector via the Kafka Connect REST API:

curl -X POST http://kafka-connect.internal:8083/connectors 
  -H 'Content-Type: application/json' 
  -d @ibmi-mq-source-config.json

Once running, every message placed on ORDEREVENT.Q by IBM i is automatically forwarded to the ibmi.orders.created Kafka topic. The IBM MQ bridge decouples the IBM i from the Kafka cluster’s availability — if Kafka is briefly unavailable, messages queue in MQ and drain when Kafka recovers.

Consuming Kafka from IBM i

IBM i is not only a Kafka producer. In event-driven architectures, IBM i may need to consume events produced by other systems — for example, consuming a payments.confirmed topic to trigger IBM i order fulfilment, or consuming a pricing.updated topic to refresh DB2 price tables.

A Python Kafka consumer in PASE that writes consumed records back into DB2 for i:

#!/QOpenSys/pkgs/bin/python3
# kafka-consumer.py — consume Kafka topic and write to DB2 for i
import json
import os
import sys
import pyodbc
from confluent_kafka import Consumer, KafkaError

KAFKA_CONFIG = {
    'bootstrap.servers':  'kafka-broker.internal:9093',
    'security.protocol':  'SASL_SSL',
    'sasl.mechanism':     'SCRAM-SHA-256',
    'sasl.username':      os.environ['KAFKA_USER'],
    'sasl.password':      os.environ['KAFKA_PASS'],
    'ssl.ca.location':    '/home/kafkauser/ca-cert.pem',
    'group.id':           'ibmi-payments-consumer',
    'auto.offset.reset':  'earliest',
    'enable.auto.commit': False,   # Manual commit for exactly-once DB writes
}

TOPIC      = 'payments.confirmed'
DB_CONN_STR = 'DSN=IBMI_PROD;UID=KAFKAUSER;PWD=secret'

def write_payment_to_db2(conn, payload):
    cursor = conn.cursor()
    cursor.execute(
        """
        MERGE INTO ORDLIB.PAYRCV AS T
        USING (VALUES (?, ?, ?, ?)) AS S(ORDNO, PAYREF, PAYAMT, PAYTS)
           ON T.PRORDNO = S.ORDNO
        WHEN MATCHED THEN
          UPDATE SET PRPAYREF = S.PAYREF,
                     PRAMT    = S.PAYAMT,
                     PRTS     = TIMESTAMP(S.PAYTS),
                     PRSTATUS = 'CONFIRMED'
        WHEN NOT MATCHED THEN
          INSERT (PRORDNO, PRPAYREF, PRAMT, PRTS, PRSTATUS)
          VALUES (S.ORDNO, S.PAYREF, S.PAYAMT, TIMESTAMP(S.PAYTS), 'CONFIRMED')
        """,
        (
            int(payload['orderNumber']),
            str(payload['paymentReference']),
            float(payload['amount']),
            str(payload['confirmedAt']),
        )
    )
    conn.commit()

def main():
    consumer = Consumer(KAFKA_CONFIG)
    consumer.subscribe([TOPIC])

    db_conn = pyodbc.connect(DB_CONN_STR, autocommit=False)
    print(f'Consumer started. Subscribed to {TOPIC}')

    try:
        while True:
            msg = consumer.poll(timeout=5.0)
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    print(f'Partition EOF: {msg.topic()} [{msg.partition()}] @ {msg.offset()}')
                else:
                    print(f'Kafka error: {msg.error()}', file=sys.stderr)
                continue

            try:
                payload = json.loads(msg.value().decode('utf-8'))
                write_payment_to_db2(db_conn, payload)
                consumer.commit(message=msg, asynchronous=False)
                print(f'Written payment ref {payload.get("paymentReference")} for order {payload.get("orderNumber")}')
            except (json.JSONDecodeError, KeyError) as e:
                print(f'Bad message at offset {msg.offset()}: {e}', file=sys.stderr)
                # Do NOT commit — will be reprocessed on restart
            except pyodbc.Error as e:
                print(f'DB2 write error: {e}', file=sys.stderr)
                db_conn.rollback()

    except KeyboardInterrupt:
        print('Shutting down consumer.')
    finally:
        consumer.close()
        db_conn.close()

if __name__ == '__main__':
    main()

Key design decisions in this consumer:

  • Manual offset commit (enable.auto.commit: False) — the offset is only committed after the DB2 write succeeds. This prevents message loss if the DB2 write fails.
  • MERGE statement — idempotent upsert handles redelivery. If Kafka redelivers the same message (e.g., after a crash before commit), the MERGE overwrites with the same data rather than inserting a duplicate.
  • No QCMDEXC in the hot path — if you need to trigger an IBM i CL command after consuming a Kafka event (e.g., SBMJOB to start a batch process), call QCMDEXC via the DB2 stored procedure interface to keep the PASE-to-IBM i interface clean.

Calling a CL command from PASE Python via the IBM i command interface:

import subprocess

def run_ibmi_command(cmd_string):
    """Run an IBM i CL command from PASE using system()."""
    result = subprocess.run(
        ['/QOpenSys/usr/bin/system', cmd_string],
        capture_output=True,
        text=True
    )
    if result.returncode != 0:
        raise RuntimeError(f'CL command failed: {result.stderr.strip()}')
    return result.stdout.strip()

# Trigger order fulfilment batch job after payment confirmed
run_ibmi_command("SBMJOB CMD(CALL PGM(ORDLIB/FULFIL) PARM('0000123')) "
                 "JOB(FULFIL) JOBD(ORDLIB/ORDJOBD) JOBQ(ORDLIB/ORDJOBQ)")

Practical Architecture Patterns

Several design decisions arise when integrating IBM i with Kafka at production scale:

  • IBM i as producer only (one-way) — the most common starting pattern. IBM i emits business events (orders, shipments, inventory) and downstream systems subscribe. No Kafka consumption on IBM i is needed. This is low-risk and high-value.
  • Bi-directional IBM i participation — IBM i both produces and consumes. This is appropriate when IBM i needs to react to events from other systems (payment confirmations, pricing updates, fulfilment requests from an order management system). Design the consume-and-write path to be idempotent from the start.
  • Partition key design — key Kafka messages by the IBM i primary key (customer number, order number) to preserve per-entity ordering. Avoid random partition assignment for IBM i data — downstream consumers typically need to process all events for a given entity in sequence.
  • Exactly-once semantics — Kafka supports exactly-once via transactional producers (transactional.id in kafkajs or confluent-kafka). For IBM i integration, the simpler alternative is to design consumers for idempotency (MERGE, not INSERT) and use manual offset commit. True exactly-once is only needed when the write to DB2 and the Kafka offset commit must be atomic — generally achievable only with the Kafka Streams API, not a PASE consumer.
  • Schema registry — for long-lived topics with schema evolution, use a Confluent Schema Registry with Avro or JSON Schema. IBM i producers emit JSON today; if the schema changes (a new field added to ORDHDR), the registry enforces backward compatibility and prevents breaking downstream consumers.
  • Monitoring — expose consumer lag via Kafka’s JMX metrics or the Confluent Control Centre. A consumer group that is falling behind on ibmi.orders.created is the first signal that a downstream processor is under-resourced or has encountered a processing error.

Next post: DB2 for i Replication — journal-based logical replication, Q-Replication architecture with InfoSphere Data Replication, MIMIX replication, and how to design a DB2 for i replication strategy for reporting offload and disaster recovery.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top