Softwareentwickler / Software Developer
Auditing executed actions

Adabas | Auditing an Adabas Database

Due to regulatory requirements, there is an increasing need to log the actions of users. Critical actions might even need to be checked for correctness. This article shows how such auditing of actions in an Adabas database can look like. Therefore, the logs produced by Adabas will be transformed into human readable log files. Afterwards they will be sent to an Elasticsearch or OpenSearch cluster for further processing.

Enabling internal Adabas Logging

To be able to store the logs produced by the Adabas database in the first place, it is necessary to activate the command logging by parameters in the Adabas nucleus.

This can be achieved by setting CLOGLAYOUT=5 and LOGGING=CB, either in the .INI file used on startup or by making use of the ADANUC command while the database is running. Possible other values for the logging configuration can be found in the documentation, but just using CB had the best effect for not losing too much database performance while keeping the relevant information.

Logs will then be stored using the location defined by the CLPCLG environment variable as a binary file.

Activate Adabas log rotation – first option

Depending on the database throughput, the log files might become very large in a short amount of time. There is no possibility to activate logging just for some users or some database commands before applying the filters using the ADACLP command.

By default, Adabas will not create a new log file and will only do so after restarting the database. One way to tell Adabas to do some form of log rotation is to provide two or more locations to store the logs via the CLPCLG parameter. The effect is described in the documentation of the database:

Adabas starts with the first environment variable (in this case NUCPLG), or with its default if the environment variable is not set (the default is a file in the current directory with the same name as the environment variable). When the end of the file is reached, or when the file system becomes full (in cases where a file system is used), the file will be closed, and the next environment variable (NUCLPG2) will be translated and the associated file or disk section opened. Then it allocates free space from that disk section to be used for the next extent. If a subsequent environment variable does not exist (e.g. NUCPLG4), Adabas will wrap around to the first disk section (NUCPLG).

It is important to mention that when Adabas wraps around to the first disk section it doesn’t automatically deletes the old logs. The logging will be stopped instead when the file storage still has no storage left.

Activate Adabas log rotation – second option (recommended)

More control over when a new log file is created is gained by the second option. The ADAOPR command provides an option to do so by using the FEOF parameter. The current log file will be closed and a new file is created. Executing this command by an automated job every 30 minutes will make sure that the log files won’t become too large.

For both options there is still the need to delete old clog files manually since Adabas won’t do the job for you. That’s why this step is also included in the following script.

#!/bin/bash

CURRENT_USER=$(whoami)
echo "Execute script as user:" $CURRENT_USER

if [ $CURRENT_USER = "sag" ]; then
  . /versis/sag/tools/sag.env > /dev/null
fi

DBS=(10 11)
DATE_FILE_DIR=/data
ADA_DIR=/data/backup/ada

for db in ${DBS[@]}; do
  echo "Open new clog file for database:" $db
  adaopr dbid=$db feof=clog
  if [ $? -ne 0 ]; then echo "Command not successful" && exit 1; fi

  last_run=$(cat $DATE_FILE_DIR/last_successful_run_$db)
  echo "Delete files since last clog export:" $last_run

  # It is possible to delete any file that has not been written to since the last export run and if it is not the most recent Adabas log file.
  deleteable_files=$(find $ADA_DIR/db0${db}/clog -maxdepth 1 -type f -not -newermt "$last_run" | grep -v $(ls $ADA_DIR/db0${db}/clog/NUCCLG.* -Art | tail -n 1))
  for file in $deleteable_files; do
    echo "Delete file:" $file
    rm -f $file
  done
done

Saving logs as human readable files

To extract information from those files the ADACLP command has to be used. The result of this command can then be passed back to a file. For automation the following script can be used by providing the correct database id, the subset of relevant users and the relevant database commands. Only the filtered information will be available in the resulting logs. Also, some directories must be changed in the script. The DATE_FILE_DIR tells the script where to look for and save files containing the timestamp of the last run of this script. Only the new actions between the last execution and the current timestamp will be processed. ADA_DIR is where the CLOG files are stored after appending /db0$db/clog to the directory and TARGET_DIR is the disk space to save the created log files at.

#!/bin/bash

CURRENT_USER=$(whoami)
echo "Execute script as user:" $CURRENT_USER

if [ $CURRENT_USER = "sag" ]; then
  . /versis/sag/tools/sag.env > /dev/null
fi

DBS=(10 11)
USERS=('user1' 'user2' 'sag')
COMMANDS="(A1,E1,CL,N1,N2,OP)"
DATE_FILE_DIR=/data
ADA_DIR=/data/backup/ada
TARGET_DIR=/data/central-logs

for db in ${DBS[@]}; do
  echo "Use database: " $db

  DATE_FILE=$DATE_FILE_DIR/last_successful_run_$db
  TIMESTAMP=$(date +"%Y-%m-%d %H:%M:%S")
  TIMESTAMP_ADACLP=$(LANG="en_US" date -d "$TIMESTAMP" +"%d-%b-%Y:%H:%M:%S")
  TIMESTAMP_LAST_RUN="1970-01-01 00:00:00"
  if [ -f $DATE_FILE ]; then TIMESTAMP_LAST_RUN=$(cat $DATE_FILE); fi
  TIMESTAMP_LAST_RUN_ADACLP=$(LANG="en_US" date -d "$TIMESTAMP_LAST_RUN" +"%d-%b-%Y:%H:%M:%S")
  echo "Letzter erfolgreicher Lauf: " $TIMESTAMP_LAST_RUN

  FOLDER=$ADA_DIR/db0$db/clog
  FILES=($(find $FOLDER -type f -name "NUCCLG*" -newermt "$TIMESTAMP_LAST_RUN" | grep -v clog$))

  for file in "${FILES[@]}"; do
    echo "Use command log:" $file
    export CLPCLG=$file

    for user in "${USERS[@]}"; do
      echo "Use user:" $user

      EXPORTFILE=$(echo $file | awk -vFS=/ -vOFS=/ '{ print $NF }')-$db-$user
      adaclp dbid=$db login_id=$user command="(A1,E1,CL,N1,N2,OP)" date="($TIMESTAMP_LAST_RUN_ADACLP,$TIMESTAMP_ADACLP)" > $TARGET_DIR/${EXPORTFILE}.tmp
      RC=$?
      if [ $RC -eq 255 ]; then echo "Skip: Unexpected EOL" && continue; fi
      if [ $RC -ne 0 ]; then echo "ADACLP return code: " $RC && exit 1; fi

      mv $TARGET_DIR/${EXPORTFILE}.tmp $TARGET_DIR/${EXPORTFILE}.clog
      if [ $? -ne 0 ]; then echo "Move return code: " $RC && exit 1; fi
    done
  done

  echo "------------------------"
  echo $TIMESTAMP > $DATE_FILE
done

Transfering data to ELK stack

The resulting files can then be collected by Filebeat, processed by Logstash, and stored in an Elasticsearch or OpenSearch cluster. Filebeat transfers the logs to Logstash line-by-line, but some of them can be ignored and the information stored in each line is not same all the time. For the best results, the following Filebeat configuration and Logstash pipeline might be used. For every command a short description will be added during the execution of the Logstash pipeline.

filebeat.inputs:
  - type: filestream
    id: adabas-clog-filestream
    paths:
      - /path/to/files/*.log
    exclude_lines: ['^%', '^-', 'RECORD', '\f']
    close.on_state_change.inactive: 2h

output.logstash:
  hosts: ["logstash:5044"]
input
{
    beats
    {
        port => 5044
        client_inactivity_timeout => 604800
    }
}

filter
{
    grok
    {
        # columns 'cop' und 'x' are emtpy
        match => { "message" => "%{SPACE}%{BASE10NUM:record}%{SPACE}%{MONTHDAY:day}-(?<month>\b(?:JAN?|FEB?|MAR?|APR?|MAY|JUN?|JUL?|AUG?|SEP?|OCT?|NOV?|DEC?)\b)-%{YEAR:year}%{SPACE}%{TIME:time}%{SPACE}%{NOTSPACE:duration}%{SPACE}%{NOTSPACE:user_id}%{SPACE}%{NOTSPACE:node_id}%{SPACE}%{NOTSPACE:login_id}%{SPACE}%{NOTSPACE:command}%{SPACE}%{BASE10NUM:response}%{SPACE}%{NOTSPACE:cid_hex}/(?<cid>.{4})%{SPACE}%{BASE10NUM:file}%{SPACE}%{BASE10NUM:isn}%{SPACE}%{BASE10NUM:th}%{SPACE}%{BASE10NUM:ioa}%{SPACE}%{BASE10NUM:iod}%{SPACE}%{BASE10NUM:iow}" }

        # column 'x' is empty
        match => { "message" => "%{SPACE}%{BASE10NUM:record}%{SPACE}%{MONTHDAY:day}-(?<month>\b(?:JAN?|FEB?|MAR?|APR?|MAY|JUN?|JUL?|AUG?|SEP?|OCT?|NOV?|DEC?)\b)-%{YEAR:year}%{SPACE}%{TIME:time}%{SPACE}%{NOTSPACE:duration}%{SPACE}%{NOTSPACE:user_id}%{SPACE}%{NOTSPACE:node_id}%{SPACE}%{NOTSPACE:login_id}%{SPACE}%{NOTSPACE:command}%{SPACE}%{BASE10NUM:response}%{SPACE}%{NOTSPACE:cid_hex}/(?<cid>.{4})%{SPACE}%{NOTSPACE:cop}%{SPACE}%{BASE10NUM:file}%{SPACE}%{BASE10NUM:isn}%{SPACE}%{BASE10NUM:th}%{SPACE}%{BASE10NUM:ioa}%{SPACE}%{BASE10NUM:iod}%{SPACE}%{BASE10NUM:iow}" }

        # no empty column
        match => { "message" => "%{SPACE}%{BASE10NUM:record}%{SPACE}%{MONTHDAY:day}-(?<month>\b(?:JAN?|FEB?|MAR?|APR?|MAY|JUN?|JUL?|AUG?|SEP?|OCT?|NOV?|DEC?)\b)-%{YEAR:year}%{SPACE}%{TIME:time}%{SPACE}%{NOTSPACE:duration}%{SPACE}%{NOTSPACE:user_id}%{SPACE}%{NOTSPACE:node_id}%{SPACE}%{NOTSPACE:login_id}%{SPACE}%{NOTSPACE:command}%{SPACE}%{BASE10NUM:response}%{SPACE}%{NOTSPACE:cid_hex}/(?<cid>.{4})%{SPACE}%{NOTSPACE:cop}%{SPACE}%{NOTSPACE:x}%{SPACE}%{BASE10NUM:file}%{SPACE}%{BASE10NUM:isn}%{SPACE}%{BASE10NUM:th}%{SPACE}%{BASE10NUM:ioa}%{SPACE}%{BASE10NUM:iod}%{SPACE}%{BASE10NUM:iow}" }
    }

    grok
    {
        match => { "[log][file][path]" => "/path/to/file/NUCCLG.%{NOTSPACE}-%{INT:database_id}.clog" }
    }

    mutate
    {
        capitalize => ["month"]
    }

    mutate
    {
        add_field => { "timestamp" => "%{year}-%{month}-%{day} %{time}"}
    }

    date
    {
        locale => "en"
        match => [ "timestamp" , "yyyy-MMM-dd HH:mm:ss" ]
        target => "timestamp"
        timezone => "Europe/Berlin"
    }

    translate
    {
        source => "[command]"
        target => "[command_description]"
        dictionary =>
        {
            "A1" => "Modify the value of one or more fields within a record."
            "BT" => "Remove all the database modifications (adds, deletes, updates) performed during the user's current logical transaction."
            "C1" => "Request that a checkpoint be taken."
            "C5" => "Write user data to the Adabas protection log."
            "CL" => "Terminate a user session."
            "E1" => "Delete a record."
            "ET" => "End a logical transaction or subtransaction."
            "HI" => "Place record in hold status, i.e. lock the records for other users."
            "L1" => "Read a single record from Data Storage."
            "L4" => "Read a single record from Data Storage."
            "L2" => "Read a record from a set of records which are stored in physical sequence in Data Storage."
            "L5" => "Read a record from a set of records which are stored in physical sequence in Data Storage."
            "L3" => "Read a file in logical sequential order, based on the sequence of the values for a given descriptor."
            "L6" => "Read a file in logical sequential order, based on the sequence of the values for a given descriptor."
            "L9" => "Determine the range of values present for a descriptor and the number of records which contain each value."
            "LF" => "Read the field definition information for a file."
            "MC" => "Reduce the interprocess communications between an Adabas database and its application programs. (Multi-Call)"
            "N1" => "Add a new record to a file."
            "N2" => "Add a new record to a file."
            "OP" => "Indicate the beginning of a user session."
            "RC" => "Release one or more command IDs that are currently assigned to a user."
            "RE" => "Read user data which has been previously stored in an Adabas system file by a CL or ET command."
            "RI" => "Release the lock for a record or to downgrade the lock level from exclusive to shared."
            "S1" => "Select records that satisfy a given search criterion."
            "S2" => "Select records that satisfy a given search criterion."
            "S4" => "Select records that satisfy a given search criterion."
            "S8" => "Perform logical processing on two ISN lists which have been previously created with Sx commands."
            "S9" => "Sort an ISN list which was created using an Sx command, or to sort a list of ISNs provided by the user."
        }
        fallback => "-"
    }
}

output
{
    ...
}

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert