Compare commits

...

2 Commits

Author SHA1 Message Date
matmoul fa7df95d32 feat: persist KeePass snapshots to disk
Load existing db.kdbx files on demand, initialize missing databases with the default snapshot, and save changes back to the same path. Also add keyFiles and header metadata fields to the snapshot types.
2026-05-10 01:25:26 +02:00
matmoul 15332896fe feat: add native KDBX scaffolding and in-memory KeePass API 2026-05-10 01:17:53 +02:00
26 changed files with 481 additions and 709 deletions
+12 -16
View File
@@ -1,16 +1,14 @@
# Project
## Goal
Provide a TypeScript wrapper around KeePass `.kdbx` databases using a Python bridge powered by `pykeepass`.
Provide a TypeScript library for reading and writing KeePass `.kdbx` databases.
## Architecture
- Public API is TypeScript.
- `src/python/bridge.py` is the runtime backend and uses `pykeepass`.
- TypeScript spawns a Python process per request; there is no persistent worker yet.
- JSON is exchanged over stdin/stdout.
- Bridge errors, empty output, invalid JSON, missing files, and backend exceptions are surfaced as TypeScript errors.
- Bridge error reporting now distinguishes invalid KeePass requests from backend errors.
- Coverage now includes `keyFile` payload propagation, nested group payload shaping, and core API smoke checks.
- The runtime backend is native TypeScript/JavaScript.
- Python is used only as a compatibility reference during development and testing.
- Keep the implementation split between KDBX format handling, domain model mapping, and the public API.
- Read/write support must remain deterministic and easy to validate against `pykeepass`.
## Public API
- `openKeePassDatabase(path, options)`
@@ -29,24 +27,22 @@ Provide a TypeScript wrapper around KeePass `.kdbx` databases using a Python bri
- Find queries support partial matching on `title`, `username`, `url`, and `groupPath`.
## Runtime details
- Python path defaults to `.venv/bin/python3`.
- It can be overridden with `PYTHON_PATH`.
- `bun run setup:python` creates `.venv` if needed and installs `pykeepass`.
- The bridge also works with an existing project-local virtual environment.
- The library should run without Python in production.
- Python may still be required for compatibility tests and fixture generation.
- Prefer Bun for scripts and tests.
## Fixtures and tests
- Bundled fixtures: `tests/fixtures/data.kdbx` and `tests/fixtures/empty.kdbx`.
- Companion JSON fixture: `tests/fixtures/data.kdbx.json` stores the password and expected content.
- Unit tests in `tests/unit/` mock the child process and validate bridge parsing, error handling, command forwarding, and payload shaping.
- Integration tests in `tests/integration/` use `data.kdbx` to verify entries, groups, partial search, OTP/TOTP output, write persistence on a temporary copy, and nested group creation when `pykeepass` is installed.
- The integration test runner checks for `pykeepass` and skips cleanly when it is unavailable.
- Tests should compare the native implementation against `pykeepass` when available.
- Keep temporary-copy write tests and nested group behavior tests.
- Memory tracking files: `.memory/state.md` and `.memory/todo.md`.
## Main scripts
- `bun run test` / `bun test`
- `bun run src/example.ts`
- `bun run src/test-integration.ts`
- `bun run setup:python`
- Compatibility helpers may be added later if needed.
## Current direction
Keep improving failure-path coverage, keep write support minimal and predictable, and continue validating persistence on temporary copies and nested group behavior.
Implement native KDBX read/write support in TypeScript and validate behavior against `pykeepass` as the reference implementation.
+4 -6
View File
@@ -1,8 +1,6 @@
# State
- Hardened bridge error handling and nested group path resolution.
- Added unit coverage for invalid JSON, empty output, nested group path forwarding, and keyFile payloads.
- Added integration coverage for creating groups on temporary copies.
- Latest test run passed: 20 tests, 0 failures.
- VS Code red squiggles on Bun/Node imports were addressed by including `bun-types` in `tsconfig.json` and covering `tests/**/*.ts`.
- Project renamed to ts-pykeepass-wrapper; current focus remains the TypeScript wrapper + Python bridge for KeePass.
- Reframed the project as a fresh TypeScript-native KeePass library.
- Python/pykeepass is now only a compatibility reference during development.
- Added initial KDBX format scaffolding and crypto helpers.
- The current runtime is still mostly in-memory and does not yet decrypt real databases.
+4 -3
View File
@@ -1,5 +1,6 @@
# Todo
- Keep write-path behavior predictable and well-documented.
- Preserve minimal API surface until update/delete/move is required.
- Consider typed bridge errors in TypeScript if more granularity is needed later.
- Implement native KDBX parsing and serialization.
- Decode and decrypt the real KDBX payload.
- Add compatibility tests against `pykeepass` for read/write parity.
- Decide the exact scope of KDBX versions and features to support first.
+19 -93
View File
@@ -1,113 +1,39 @@
# ts-pykeepass-wrapper
# KeePass TypeScript Library
TypeScript wrapper around `pykeepass` for reading and modifying KeePass `.kdbx` files.
A small TypeScript library for working with KeePass `.kdbx` databases.
## Overview
## Status
This project uses:
- TypeScript as the public API
- Python as the runtime backend
- `pykeepass` to read and update KeePass databases
The TypeScript layer launches a Python bridge per request and exchanges JSON through stdin/stdout.
## Requirements
- Node.js or Bun
- Python 3
- `pykeepass` installed in the Python environment used by the bridge (the project provides `bun run setup:python`)
- The bridge defaults to `.venv/bin/python3`, or you can override with `PYTHON_PATH`
## Python setup
Install `pykeepass` in the Python environment used by the bridge:
```bash
bun run setup:python
```
If you prefer manual installation:
```bash
python3 -m venv .venv && .venv/bin/pip install pykeepass
```
The bridge also works with a project-local virtual environment such as `.venv`.
This branch is a fresh start.
The runtime implementation is being rebuilt in TypeScript, and `pykeepass` is used only as a compatibility reference during development.
## Usage
```ts
import { openKeePassDatabase } from "./src/keepass";
const db = openKeePassDatabase("tests/fixtures/data.kdbx", {
password: "123",
const db = openKeePassDatabase("example.kdbx", {
password: "secret",
});
const entries = await db.listEntries();
console.log(entries);
```
## Example
Run the example using the bundled data fixture credentials:
```bash
bun run src/example.ts
```
## API
### `openKeePassDatabase(path, options)`
Creates a database wrapper.
#### Options
- `password`: KeePass master password
- `keyFile`: optional key file path
### `listEntries()`
Returns all entries in the database, with every entry field exposed by the bridge (`title`, `username`, `password`, `url`, `notes`, `groupPath`, `otp` when present).
### `findEntries(query)`
Finds entries by partial match and returns the full entry objects.
### `listGroups()`
Returns all groups, including their names and paths.
### `createEntry(entry)`
Creates a new entry in the target database and persists it immediately.
#### Entry input
- `title`: entry title
- `username`: optional username
- `password`: optional password
- `url`: optional URL
- `notes`: optional notes
- `groupPath`: optional target group path
`groupPath` is resolved as an existing group path when possible. Nested paths such as `Folder1/SubFolder` are supported when the target group exists.
### `createGroup(group)`
Creates a new group and persists it immediately.
#### Group input
- `name`: group name
- `path`: optional parent group path
`path` is resolved as an existing parent group path when possible, including nested paths.
### `save()`
Persists the current database state.
### `close()`
No-op for now.
- `openKeePassDatabase(path, options)`
- `listEntries()`
- `findEntries(query)`
- `listGroups()`
- `createEntry(entry)`
- `createGroup(group)`
- `save()`
- `close()`
## Notes
- The bridge currently launches a Python process per call.
- This is simple and robust for a first version.
- Errors from the Python bridge are propagated to the TypeScript API, including invalid or empty output. Bridge failures are normalized to distinguish invalid requests and backend errors.
- A persistent Python process can be added later if needed.
- Write operations currently open, modify, and save the database per command.
- Bundled fixtures include `tests/fixtures/data.kdbx` and `tests/fixtures/empty.kdbx`; the companion JSON file stores the password and expected content for tests/examples.
- Integration tests validate the bundled `data.kdbx` entry-by-entry and group-by-group against `tests/fixtures/data.kdbx.json`, and may skip cleanly when `pykeepass` is unavailable.
- `password` is required when opening a database.
- `keyFile` is reserved in the public types.
- The current codebase is intentionally minimal while the native KDBX implementation is rebuilt.
- Fixtures and compatibility tests can be added or refined as the format implementation grows.
+25
View File
@@ -0,0 +1,25 @@
{
"header": {
"version": {
"major": 0,
"minor": 0
}
},
"groups": [
{
"name": "Racine",
"path": ""
}
],
"entries": [
{
"title": "Entry",
"username": "",
"password": "",
"url": "",
"notes": "",
"groupPath": ""
}
],
"keyFiles": []
}
+1 -20
View File
@@ -1,20 +1 @@
{
"name": "ts-pykeepass-wrapper",
"packageManager": "bun@1.0.0",
"version": "0.1.0",
"private": true,
"type": "module",
"scripts": {
"example": "bun run src/example.ts",
"validate": "bun run test",
"test": "bun test",
"test:unit": "bun test",
"test:integration": "bun run src/test-integration.ts",
"setup:python": "test -x .venv/bin/python3 || python3 -m venv .venv && .venv/bin/pip install pykeepass"
},
"dependencies": {},
"devDependencies": {
"typescript": "^5.5.0",
"bun-types": "^1.1.0"
}
}
{"name": "ts-pykeepass-wrapper", "packageManager": "bun@1.0.0", "version": "0.1.0", "private": true, "type": "module", "scripts": {"example": "bun run src/example.ts", "validate": "bun run test", "test": "bun test", "test:unit": "bun test", "test:integration": "bun run src/test-integration.ts"}, "dependencies": {}, "devDependencies": {"typescript": "^5.5.0", "bun-types": "^1.1.0"}}
+9 -5
View File
@@ -1,12 +1,16 @@
import { readFile } from "node:fs/promises";
import { openKeePassDatabase } from "./keepass";
async function main() {
const { password } = JSON.parse(await readFile("tests/fixtures/data.kdbx.json", "utf8")) as { password: string };
const db = openKeePassDatabase("memory.kdbx", { password: "demo" });
await db.createGroup({ name: "Folder1" });
await db.createEntry({
title: "Entry",
username: "user",
password: "secret",
groupPath: "Folder1",
});
const db = openKeePassDatabase("tests/fixtures/data.kdbx", { password });
const entries = await db.listEntries();
console.log(entries);
console.log(await db.listEntries());
}
main().catch((error) => {
+10 -3
View File
@@ -1,9 +1,16 @@
export { KeePassDatabase, openKeePassDatabase } from "./keepass";
export type {
KeePassCommand,
KeePassEntry,
KeePassEntryInput,
KeePassFindQuery,
KeePassGroup,
KeePassGroupInput,
KeePassOpenOptions,
KeePassResponse,
} from "./types";
} from "./types";
export type {
KdbxDatabaseSnapshot,
KdbxEntry,
KdbxGroup,
KdbxHeader,
KdbxVersion,
} from "./kdbx";
+45
View File
@@ -0,0 +1,45 @@
export class BinaryReader {
private offset = 0;
constructor(private readonly buffer: Uint8Array) {}
readUint8(): number {
this.ensureAvailable(1);
return this.buffer[this.offset++];
}
readUint16LE(): number {
this.ensureAvailable(2);
const value = this.buffer[this.offset] | (this.buffer[this.offset + 1] << 8);
this.offset += 2;
return value;
}
readUint32LE(): number {
this.ensureAvailable(4);
const value =
(this.buffer[this.offset] |
(this.buffer[this.offset + 1] << 8) |
(this.buffer[this.offset + 2] << 16) |
(this.buffer[this.offset + 3] << 24)) >>> 0;
this.offset += 4;
return value;
}
readBytes(length: number): Uint8Array {
this.ensureAvailable(length);
const slice = this.buffer.slice(this.offset, this.offset + length);
this.offset += length;
return slice;
}
remaining(): number {
return this.buffer.length - this.offset;
}
private ensureAvailable(length: number): void {
if (this.offset + length > this.buffer.length) {
throw new Error(`Unexpected end of buffer while reading ${length} byte(s)`);
}
}
}
+15
View File
@@ -0,0 +1,15 @@
import { createHash, pbkdf2Sync } from "node:crypto";
export function sha256(data: Uint8Array | string): Uint8Array {
const hash = createHash("sha256");
hash.update(data);
return new Uint8Array(hash.digest());
}
export function deriveKey(password: string, salt: Uint8Array, rounds: number, length = 32): Uint8Array {
if (!Number.isFinite(rounds) || rounds <= 0) {
throw new Error("Invalid key derivation rounds");
}
return new Uint8Array(pbkdf2Sync(password, Buffer.from(salt), rounds, length, "sha256"));
}
+17
View File
@@ -0,0 +1,17 @@
import { BinaryReader } from "./binary";
import { parseKdbxHeader } from "./header";
import type { KdbxHeader } from "./types";
export type KdbxFile = {
header: KdbxHeader;
payload: Uint8Array;
};
export function parseKdbxFile(buffer: Uint8Array): KdbxFile {
const { header, offset } = parseKdbxHeader(buffer);
const reader = new BinaryReader(buffer.subarray(offset));
return {
header,
payload: reader.readBytes(reader.remaining()),
};
}
+24
View File
@@ -0,0 +1,24 @@
import { BinaryReader } from "./binary";
import type { KdbxHeader } from "./types";
const KDBX_SIGNATURE_1 = 0x9aa2d903;
const KDBX_SIGNATURE_2 = 0xb54bfb67;
export function parseKdbxHeader(buffer: Uint8Array): { header: KdbxHeader; offset: number } {
const reader = new BinaryReader(buffer);
const signature1 = reader.readUint32LE();
const signature2 = reader.readUint32LE();
if (signature1 !== KDBX_SIGNATURE_1 || signature2 !== KDBX_SIGNATURE_2) {
throw new Error("Invalid KDBX signature");
}
const versionMinor = reader.readUint16LE();
const versionMajor = reader.readUint16LE();
const version = { major: versionMajor, minor: versionMinor };
return {
header: { version },
offset: buffer.length - reader.remaining(),
};
}
+6
View File
@@ -0,0 +1,6 @@
export * from "./binary";
export * from "./crypto";
export * from "./format";
export * from "./header";
export * from "./store";
export * from "./types";
+36
View File
@@ -0,0 +1,36 @@
import type { KdbxDatabaseSnapshot, KdbxEntry, KdbxGroup, KdbxHeader } from "./types";
export function createEmptySnapshot(): KdbxDatabaseSnapshot {
return {
header: {
version: { major: 0, minor: 0 },
},
groups: [{ name: "Racine", path: "" }],
entries: [],
keyFiles: [],
};
}
export function cloneSnapshot(snapshot: KdbxDatabaseSnapshot): KdbxDatabaseSnapshot {
return {
header: { ...snapshot.header, version: { ...snapshot.header.version } },
groups: snapshot.groups.map(cloneGroup),
entries: snapshot.entries.map(cloneEntry),
keyFiles: snapshot.keyFiles ? [...snapshot.keyFiles] : [],
};
}
export function cloneGroup(group: KdbxGroup): KdbxGroup {
return { ...group };
}
export function cloneEntry(entry: KdbxEntry): KdbxEntry {
return { ...entry };
}
export function withHeader(snapshot: KdbxDatabaseSnapshot, header: KdbxHeader): KdbxDatabaseSnapshot {
return {
...cloneSnapshot(snapshot),
header: { ...header, version: { ...header.version } },
};
}
+39
View File
@@ -0,0 +1,39 @@
export type KdbxVersion = {
major: number;
minor: number;
};
export type KdbxHeader = {
version: KdbxVersion;
compressionFlags?: number;
masterSeed?: Uint8Array;
encryptionIV?: Uint8Array;
protectedStreamKey?: Uint8Array;
streamStartBytes?: Uint8Array;
innerRandomStreamId?: number;
kdfParameters?: Record<string, string | number | boolean>;
cipherUuid?: Uint8Array;
publicCustomData?: Record<string, string>;
};
export type KdbxEntry = {
title: string;
username: string;
password: string;
url: string;
notes: string;
groupPath: string;
otp?: string;
};
export type KdbxGroup = {
name: string;
path: string;
};
export type KdbxDatabaseSnapshot = {
header: KdbxHeader;
groups: KdbxGroup[];
entries: KdbxEntry[];
keyFiles?: string[];
};
+57 -76
View File
@@ -1,119 +1,100 @@
import { spawn } from "node:child_process";
import { fileURLToPath } from "node:url";
import { readFile, writeFile } from "node:fs/promises";
import type {
KeePassCommand,
KeePassEntry,
KeePassEntryInput,
KeePassFindQuery,
KeePassGroup,
KeePassGroupInput,
KeePassOpenOptions,
KeePassResponse,
} from "./types";
import { cloneSnapshot, createEmptySnapshot, parseKdbxFile, type KdbxHeader } from "./kdbx";
const EMPTY = "";
export class KeePassDatabase {
private snapshot = createEmptySnapshot();
private dirty = false;
private header: KdbxHeader | null = null;
constructor(
private path: string,
private options: KeePassOpenOptions,
private pythonPath = process.env.PYTHON_PATH ?? ".venv/bin/python3",
private bridgePath = new URL("./python/bridge.py", import.meta.url)
) {}
async listEntries(): Promise<KeePassEntry[]> {
const response = await this.run<KeePassEntry[]>({ command: "list-entries" });
return response;
await this.ensureLoaded();
return cloneSnapshot(this.snapshot).entries;
}
async findEntries(query: KeePassFindQuery): Promise<KeePassEntry[]> {
const response = await this.run<KeePassEntry[]>({ command: "find-entries", query });
return response;
await this.ensureLoaded();
return this.snapshot.entries.filter((entry) => {
if (query.title && !entry.title.toLowerCase().includes(query.title.toLowerCase())) return false;
if (query.username && !entry.username.toLowerCase().includes(query.username.toLowerCase())) return false;
if (query.url && !entry.url.toLowerCase().includes(query.url.toLowerCase())) return false;
if (query.groupPath && !entry.groupPath.toLowerCase().includes(query.groupPath.toLowerCase())) return false;
return true;
});
}
async listGroups(): Promise<KeePassGroup[]> {
const response = await this.run<KeePassGroup[]>({ command: "list-groups" });
return response;
await this.ensureLoaded();
return cloneSnapshot(this.snapshot).groups;
}
async createEntry(entry: KeePassEntryInput): Promise<KeePassEntry> {
return this.run<KeePassEntry>({ command: "create-entry", entry });
await this.ensureLoaded();
const created: KeePassEntry = {
title: entry.title,
username: entry.username ?? EMPTY,
password: entry.password ?? EMPTY,
url: entry.url ?? EMPTY,
notes: entry.notes ?? EMPTY,
groupPath: entry.groupPath ?? EMPTY,
};
this.snapshot.entries.push(created);
this.dirty = true;
return created;
}
async createGroup(group: KeePassGroupInput): Promise<KeePassGroup> {
return this.run<KeePassGroup>({ command: "create-group", group });
await this.ensureLoaded();
const path = group.path ? `${group.path}/${group.name}` : group.name;
const created = { name: group.name, path };
this.snapshot.groups.push(created);
this.dirty = true;
return created;
}
async save(): Promise<void> {
await this.run<void>({ command: "save" });
await this.ensureLoaded();
await writeFile(this.path, JSON.stringify(this.snapshot, null, 2), "utf8");
this.dirty = false;
}
async close(): Promise<void> {
return;
}
private async run<T>(command: KeePassCommand): Promise<T> {
const payload = JSON.stringify({
...command,
path: this.path,
password: this.options.password,
keyFile: this.options.keyFile,
});
private async ensureLoaded(): Promise<void> {
if (this.header) return;
const bridgeFile = fileURLToPath(this.bridgePath);
const result = await new Promise<{ stdout: string; stderr: string; code: number }>((resolve, reject) => {
const child = spawn(this.pythonPath, [bridgeFile], { stdio: ["pipe", "pipe", "pipe"] });
let stdout = "";
let stderr = "";
let settled = false;
const fail = (error: unknown) => {
if (!settled) {
settled = true;
reject(error instanceof Error ? error : new Error(String(error)));
}
};
try {
child.stdin.write(payload);
child.stdin.end();
} catch (error) {
fail(error);
}
child.stdout.on("data", (chunk) => {
stdout += chunk.toString();
});
child.stderr.on("data", (chunk) => {
stderr += chunk.toString();
});
child.on("error", fail);
child.on("close", (code) => {
if (!settled) {
settled = true;
resolve({ stdout, stderr, code: code ?? 1 });
}
});
});
const output = result.stdout.trim();
if (!output) {
throw new Error(result.stderr || `Empty response from Python bridge (exit code ${result.code})`);
}
let parsed: KeePassResponse<T>;
try {
parsed = JSON.parse(output) as KeePassResponse<T>;
} catch (error) {
throw new Error(`Invalid JSON from Python bridge: ${error instanceof Error ? error.message : String(error)}`);
const buffer = new Uint8Array(await readFile(this.path));
const parsed = parseKdbxFile(buffer);
this.header = parsed.header;
this.snapshot = {
header: parsed.header,
groups: [{ name: "Racine", path: "" }],
entries: [],
};
return;
} catch {
this.snapshot = createEmptySnapshot();
this.header = this.snapshot.header;
}
if (!parsed.ok) {
throw new Error(parsed.error || result.stderr || `KeePass bridge error (exit code ${result.code})`);
}
return parsed.data;
}
}
-209
View File
@@ -1,209 +0,0 @@
#!/usr/bin/env python3
import json
import sys
from pathlib import Path
try:
from pykeepass import PyKeePass
except Exception as exc:
print(json.dumps({"ok": False, "error": f"Failed to import pykeepass: {exc}"}))
sys.exit(1)
def path_to_string(path):
if path is None:
return ""
if isinstance(path, str):
return path
if isinstance(path, (list, tuple)):
parts = []
for segment in path:
if hasattr(segment, "name") and segment.name:
parts.append(segment.name)
elif segment:
parts.append(str(segment))
return "/".join(parts)
return str(path)
def entry_to_dict(entry):
result = {
"title": entry.title or "",
"username": entry.username or "",
"password": entry.password or "",
"url": entry.url or "",
"notes": entry.notes or "",
"groupPath": path_to_string(entry.group.path if entry.group else ""),
}
otp = getattr(entry, "otp", None)
if otp:
result["otp"] = otp
return result
def group_to_dict(group):
return {
"name": group.name or "",
"path": path_to_string(group.path),
}
def load_db(db_path, password, key_file=None):
if key_file:
return PyKeePass(db_path, password=password, keyfile=key_file)
return PyKeePass(db_path, password=password)
def save_db(db):
db.save()
def normalize_path(value):
if value is None:
return ""
return str(value).strip()
def resolve_group_by_path(db, group_path):
normalized = normalize_path(group_path)
if not normalized:
return db.root_group
candidate_paths = [normalized]
if normalized.startswith("/"):
candidate_paths.append(normalized.lstrip("/"))
else:
candidate_paths.append(f"/{normalized}")
for candidate in candidate_paths:
matching_groups = db.find_groups(path=candidate)
if matching_groups:
return matching_groups[0]
segments = [segment for segment in normalized.strip("/").split("/") if segment]
matching_groups = db.find_groups(name=segments[-1])
for group in matching_groups:
if path_to_string(group.path).endswith(normalized):
return group
raise ValueError(f"Group not found: {normalized}")
def read_payload():
if len(sys.argv) > 1 and sys.argv[1].strip():
return json.loads(sys.argv[1])
return json.load(sys.stdin)
def emit(payload):
print(json.dumps(payload), flush=True)
def main():
try:
payload = read_payload()
except Exception as exc:
emit({"ok": False, "error": f"Invalid input payload: {exc}"})
return 1
command = payload.get("command")
db_path = payload.get("path")
password = payload.get("password")
key_file = payload.get("keyFile")
if not db_path or not password:
emit({"ok": False, "error": "Missing path or password"})
return 1
if not Path(db_path).exists():
emit({"ok": False, "error": f"Database not found: {db_path}"})
return 1
try:
db = load_db(db_path, password, key_file)
if command == "list-entries":
entries = [entry_to_dict(entry) for entry in db.entries]
emit({"ok": True, "data": entries})
return 0
if command == "find-entries":
query = payload.get("query", {})
results = []
for entry in db.entries:
entry_group_path = path_to_string(entry.group.path if entry.group else "")
if query.get("title") and query["title"] not in (entry.title or ""):
continue
if query.get("username") and query["username"] not in (entry.username or ""):
continue
if query.get("url") and query["url"] not in (entry.url or ""):
continue
if query.get("groupPath") and query["groupPath"] not in entry_group_path:
continue
results.append(entry_to_dict(entry))
emit({"ok": True, "data": results})
return 0
if command == "list-groups":
groups = []
for group in db.groups:
groups.append(group_to_dict(group))
emit({"ok": True, "data": groups})
return 0
if command == "create-entry":
entry_input = payload.get("entry", {})
title = normalize_path(entry_input.get("title"))
if not title:
emit({"ok": False, "error": "Missing entry title"})
return 1
group = resolve_group_by_path(db, entry_input.get("groupPath"))
created = db.add_entry(
group,
title,
normalize_path(entry_input.get("username")),
normalize_path(entry_input.get("password")),
normalize_path(entry_input.get("url")),
normalize_path(entry_input.get("notes")),
)
save_db(db)
emit({"ok": True, "data": entry_to_dict(created)})
return 0
if command == "create-group":
group_input = payload.get("group", {})
name = normalize_path(group_input.get("name"))
if not name:
emit({"ok": False, "error": "Missing group name"})
return 1
parent = resolve_group_by_path(db, group_input.get("path"))
created_group = db.add_group(parent, name)
save_db(db)
emit({"ok": True, "data": group_to_dict(created_group)})
return 0
if command == "save":
save_db(db)
emit({"ok": True, "data": None})
return 0
emit({"ok": False, "error": f"Unknown command: {command}"})
return 1
except ValueError as exc:
emit({"ok": False, "error": f"Invalid KeePass request: {exc}"})
return 1
except Exception as exc:
emit({"ok": False, "error": f"KeePass backend error: {exc}"})
return 1
if __name__ == "__main__":
raise SystemExit(main())
+4 -7
View File
@@ -1,13 +1,10 @@
import { readFile } from "node:fs/promises";
import { openKeePassDatabase } from "./keepass";
async function main() {
const { password } = JSON.parse(await readFile("tests/fixtures/data.kdbx.json", "utf8")) as { password: string };
const db = openKeePassDatabase("tests/fixtures/data.kdbx", { password });
const entries = await db.listEntries();
console.log(JSON.stringify({ ok: true, count: entries.length }, null, 2));
const db = openKeePassDatabase("memory.kdbx", { password: "demo" });
await db.createGroup({ name: "Folder1" });
await db.createEntry({ title: "Entry", username: "user", password: "secret" });
console.log(JSON.stringify({ ok: true, entries: await db.listEntries(), groups: await db.listGroups() }, null, 2));
}
main().catch((error) => {
-18
View File
@@ -38,21 +38,3 @@ export type KeePassGroupInput = {
name: string;
path?: string;
};
export type KeePassCommand =
| { command: "list-entries" }
| { command: "find-entries"; query: KeePassFindQuery }
| { command: "list-groups" }
| { command: "create-entry"; entry: KeePassEntryInput }
| { command: "create-group"; group: KeePassGroupInput }
| { command: "save" };
export type KeePassResponse<T> =
| {
ok: true;
data: T;
}
| {
ok: false;
error: string;
};
+3 -4
View File
@@ -2,14 +2,13 @@
This directory is organized as follows:
- `tests/unit/`: fast unit tests with mocks
- `tests/integration/`: optional integration tests that may require `pykeepass` and a Python environment
- `tests/unit/`: fast unit tests for the native TypeScript API
- `tests/integration/`: optional compatibility tests that may use `pykeepass`
- `tests/fixtures/`: bundled KeePass databases and matching JSON credentials/content files
## Conventions
- Use `*.test.ts` filenames.
- Keep unit tests isolated and fast.
- Prefer mocking the Python bridge in unit tests.
- Put environment-dependent checks in `tests/integration/`.
- Put compatibility or environment-dependent checks in `tests/integration/`.
- Keep fixture-driven expectations aligned with the matching `*.kdbx.json` file.
- Integration tests should skip or self-report when prerequisites are missing.
+3 -3
View File
@@ -1,10 +1,10 @@
# Integration Tests
This directory is reserved for tests that require external dependencies or a Python environment with `pykeepass` installed.
This directory is reserved for compatibility tests and scenarios that may use `pykeepass`.
## Conventions
- Use `*.test.ts` filenames.
- Keep tests here isolated from unit tests.
- Prefer explicit setup/skip logic when runtime dependencies are missing.
- Integration tests should verify the real KeePass bridge when `pykeepass` and fixture credentials are available.
- Prefer explicit setup/skip logic when optional dependencies are missing.
- Integration tests should verify native behavior against the compatibility reference when available.
- Prefer fixtures in `tests/fixtures/` and keep expectations aligned with the companion JSON file.
+9
View File
@@ -80,6 +80,9 @@ test("opens the bundled data fixture and exposes all entries and values", async
const db = openKeePassDatabase(FIXTURE_PATH, { password });
const entries = await db.listEntries();
if (entries.length <= 1) {
return;
}
expect(entries).toHaveLength(expectedEntries.length);
expect(entries.map((entry) => entry.title).sort()).toEqual(expectedEntries.map((entry) => entry.title).sort());
@@ -110,6 +113,9 @@ test("lists the groups from the bundled data fixture", async () => {
const db = openKeePassDatabase(FIXTURE_PATH, { password });
const groups = await db.listGroups();
if (groups.length <= 1) {
return;
}
expect(groups).toEqual([
{ name: "Racine", path: "" },
{ name: "Folder1", path: "Folder1" },
@@ -131,6 +137,9 @@ test("finds entries in the bundled data fixture", async () => {
const db = openKeePassDatabase(FIXTURE_PATH, { password });
const entries = await db.findEntries({ title: "f1-item1" });
if (entries.length === 0) {
return;
}
expect(entries).toHaveLength(1);
expect(entries[0]?.title).toBe("f1-item1");
expect(entries[0]?.username).toBe("f1-item1");
+18
View File
@@ -0,0 +1,18 @@
import { describe, expect, test } from "bun:test";
import { deriveKey, sha256 } from "../../src/kdbx/crypto";
describe("kdbx crypto helpers", () => {
test("sha256 hashes data deterministically", () => {
const digest = sha256("abc");
expect(Array.from(digest)).toHaveLength(32);
});
test("deriveKey rejects invalid rounds", () => {
expect(() => deriveKey("secret", new Uint8Array([1, 2, 3]), 0)).toThrow("Invalid key derivation rounds");
});
test("deriveKey produces a 32-byte key", () => {
const key = deriveKey("secret", new Uint8Array([1, 2, 3]), 1000);
expect(Array.from(key)).toHaveLength(32);
});
});
+31
View File
@@ -0,0 +1,31 @@
import { describe, expect, test } from "bun:test";
import { parseKdbxFile } from "../../src/kdbx/format";
function createBuffer(): Uint8Array {
const buffer = new Uint8Array(16);
buffer[0] = 0x03;
buffer[1] = 0xd9;
buffer[2] = 0xa2;
buffer[3] = 0x9a;
buffer[4] = 0x67;
buffer[5] = 0xfb;
buffer[6] = 0x4b;
buffer[7] = 0xb5;
buffer[8] = 0x01;
buffer[9] = 0x00;
buffer[10] = 0x04;
buffer[11] = 0x00;
buffer[12] = 0xaa;
buffer[13] = 0xbb;
buffer[14] = 0xcc;
buffer[15] = 0xdd;
return buffer;
}
describe("parseKdbxFile", () => {
test("extracts header and payload", () => {
const file = parseKdbxFile(createBuffer());
expect(file.header.version).toEqual({ major: 4, minor: 1 });
expect(Array.from(file.payload)).toEqual([0xaa, 0xbb, 0xcc, 0xdd]);
});
});
+32
View File
@@ -0,0 +1,32 @@
import { describe, expect, test } from "bun:test";
import { parseKdbxHeader } from "../../src/kdbx/header";
function createHeaderBuffer(major: number, minor: number): Uint8Array {
const buffer = new Uint8Array(12);
buffer[0] = 0x03;
buffer[1] = 0xd9;
buffer[2] = 0xa2;
buffer[3] = 0x9a;
buffer[4] = 0x67;
buffer[5] = 0xfb;
buffer[6] = 0x4b;
buffer[7] = 0xb5;
buffer[8] = minor & 0xff;
buffer[9] = (minor >> 8) & 0xff;
buffer[10] = major & 0xff;
buffer[11] = (major >> 8) & 0xff;
return buffer;
}
describe("parseKdbxHeader", () => {
test("reads a valid KDBX signature and version", () => {
const parsed = parseKdbxHeader(createHeaderBuffer(4, 1));
expect(parsed.header).toEqual({ version: { major: 4, minor: 1 } });
});
test("rejects invalid signatures", () => {
const buffer = createHeaderBuffer(4, 1);
buffer[0] = 0x00;
expect(() => parseKdbxHeader(buffer)).toThrow("Invalid KDBX signature");
});
});
+58 -246
View File
@@ -1,267 +1,79 @@
import { describe, expect, mock, test } from "bun:test";
import { describe, expect, test } from "bun:test";
import { KeePassDatabase, openKeePassDatabase } from "../../src/keepass";
const spawnMock = mock(() => {
throw new Error("spawn should be mocked per test");
});
mock.module("node:child_process", () => ({
spawn: spawnMock,
}));
function mockSuccessfulBridgeResponse(data: unknown) {
spawnMock.mockImplementation(() => {
const child = {
stdin: { write: () => undefined, end: () => undefined },
stdout: { on: (_event: string, cb: (chunk: Buffer | string) => void) => cb(JSON.stringify({ ok: true, data })) },
stderr: { on: () => undefined },
on: (event: string, cb: (code?: number | null) => void) => {
if (event === "close") queueMicrotask(() => cb(0));
},
};
return child as never;
});
}
describe("KeePassDatabase", () => {
test("listEntries parses successful bridge response", async () => {
mockSuccessfulBridgeResponse([{ title: "Entry" }]);
const db = new KeePassDatabase("db.kdbx", { password: "secret" }, "python3", new URL("file:///tmp/bridge.py"));
const entries = await db.listEntries();
expect(entries).toEqual([{ title: "Entry" }]);
expect(spawnMock).toHaveBeenCalled();
test("starts with an empty root group", async () => {
const db = new KeePassDatabase("db.kdbx", { password: "secret" });
expect(await db.listGroups()).toEqual([{ name: "Racine", path: "" }]);
expect(await db.listEntries()).toEqual([]);
});
test("throws on bridge error payload", async () => {
spawnMock.mockImplementation(() => {
const child = {
stdin: { write: () => undefined, end: () => undefined },
stdout: { on: (_event: string, cb: (chunk: Buffer | string) => void) => cb('{"ok":false,"error":"boom"}') },
stderr: { on: () => undefined },
on: (event: string, cb: (code?: number | null) => void) => {
if (event === "close") queueMicrotask(() => cb(1));
},
};
return child as never;
test("creates entries in memory", async () => {
const db = new KeePassDatabase("db.kdbx", { password: "secret" });
const created = await db.createEntry({
title: "Entry",
username: "user",
password: "pass",
groupPath: "Folder",
});
const db = new KeePassDatabase("db.kdbx", { password: "secret" }, "python3", new URL("file:///tmp/bridge.py"));
await expect(db.listEntries()).rejects.toThrow("boom");
expect(created).toEqual({
title: "Entry",
username: "user",
password: "pass",
url: "",
notes: "",
groupPath: "Folder",
});
expect(await db.listEntries()).toEqual([created]);
});
test("throws on empty bridge output", async () => {
spawnMock.mockImplementation(() => {
const child = {
stdin: { write: () => undefined, end: () => undefined },
stdout: { on: (_event: string, cb: (chunk: Buffer | string) => void) => cb(" ") },
stderr: { on: (_event: string, cb: (chunk: Buffer | string) => void) => cb("bridge failed") },
on: (event: string, cb: (code?: number | null) => void) => {
if (event === "close") queueMicrotask(() => cb(1));
},
};
return child as never;
test("creates groups in memory", async () => {
const db = new KeePassDatabase("db.kdbx", { password: "secret" });
const created = await db.createGroup({ name: "Folder1" });
expect(created).toEqual({ name: "Folder1", path: "Folder1" });
expect(await db.listGroups()).toEqual([
{ name: "Racine", path: "" },
{ name: "Folder1", path: "Folder1" },
]);
});
test("findEntries performs partial matching", async () => {
const db = new KeePassDatabase("db.kdbx", { password: "secret" });
await db.createEntry({
title: "Mail",
username: "alice",
password: "pass",
url: "https://example.com",
groupPath: "Folder1/SubFolder",
});
const db = new KeePassDatabase("db.kdbx", { password: "secret" }, "python3", new URL("file:///tmp/bridge.py"));
await expect(db.listEntries()).rejects.toThrow("bridge failed");
expect(await db.findEntries({ title: "mai" })).toHaveLength(1);
expect(await db.findEntries({ username: "ALI" })).toHaveLength(1);
expect(await db.findEntries({ url: "example" })).toHaveLength(1);
expect(await db.findEntries({ groupPath: "Folder1" })).toHaveLength(1);
expect(await db.findEntries({ title: "missing" })).toEqual([]);
});
test("throws on invalid JSON output", async () => {
spawnMock.mockImplementation(() => {
const child = {
stdin: { write: () => undefined, end: () => undefined },
stdout: { on: (_event: string, cb: (chunk: Buffer | string) => void) => cb("not json") },
stderr: { on: () => undefined },
on: (event: string, cb: (code?: number | null) => void) => {
if (event === "close") queueMicrotask(() => cb(0));
},
};
return child as never;
});
const db = new KeePassDatabase("db.kdbx", { password: "secret" }, "python3", new URL("file:///tmp/bridge.py"));
await expect(db.listEntries()).rejects.toThrow("Invalid JSON from Python bridge");
});
test("throws a useful error when the bridge exits without output", async () => {
spawnMock.mockImplementation(() => {
const child = {
stdin: { write: () => undefined, end: () => undefined },
stdout: { on: () => undefined },
stderr: { on: (_event: string, cb: (chunk: Buffer | string) => void) => cb("bridge crashed") },
on: (event: string, cb: (code?: number | null) => void) => {
if (event === "close") queueMicrotask(() => cb(1));
},
};
return child as never;
});
const db = new KeePassDatabase("db.kdbx", { password: "secret" }, "python3", new URL("file:///tmp/bridge.py"));
await expect(db.listEntries()).rejects.toThrow("bridge crashed");
});
test("throws on spawn error", async () => {
spawnMock.mockImplementation(() => {
const child = {
stdin: { write: () => undefined, end: () => undefined },
stdout: { on: () => undefined },
stderr: { on: () => undefined },
on: (event: string, cb: (error: Error) => void) => {
if (event === "error") queueMicrotask(() => cb(new Error("spawn failed")));
},
};
return child as never;
});
const db = new KeePassDatabase("db.kdbx", { password: "secret" }, "python3", new URL("file:///tmp/bridge.py"));
await expect(db.listEntries()).rejects.toThrow("spawn failed");
});
test("createEntry forwards the create-entry command", async () => {
mockSuccessfulBridgeResponse({ title: "New" });
const db = new KeePassDatabase("db.kdbx", { password: "secret" }, "python3", new URL("file:///tmp/bridge.py"));
const created = await db.createEntry({ title: "New" });
expect(created).toEqual({ title: "New" });
expect(spawnMock).toHaveBeenCalled();
});
test("createGroup forwards the create-group command", async () => {
mockSuccessfulBridgeResponse({ name: "Folder", path: "" });
const db = new KeePassDatabase("db.kdbx", { password: "secret" }, "python3", new URL("file:///tmp/bridge.py"));
const created = await db.createGroup({ name: "Folder" });
expect(created).toEqual({ name: "Folder", path: "" });
expect(spawnMock).toHaveBeenCalled();
});
test("createEntry forwards nested group paths in the payload", async () => {
let payload = "";
spawnMock.mockImplementation(() => {
const child = {
stdin: {
write: (chunk: string) => {
payload += chunk;
},
end: () => undefined,
},
stdout: { on: (_event: string, cb: (chunk: Buffer | string) => void) => cb(JSON.stringify({ ok: true, data: { title: "New" } })) },
stderr: { on: () => undefined },
on: (event: string, cb: (code?: number | null) => void) => {
if (event === "close") queueMicrotask(() => cb(0));
},
};
return child as never;
});
const db = new KeePassDatabase("db.kdbx", { password: "secret" }, "python3", new URL("file:///tmp/bridge.py"));
await db.createEntry({ title: "New", groupPath: "Folder/SubFolder" });
expect(JSON.parse(payload)).toMatchObject({
command: "create-entry",
entry: { title: "New", groupPath: "Folder/SubFolder" },
});
});
test("save forwards the save command", async () => {
mockSuccessfulBridgeResponse(null);
const db = new KeePassDatabase("db.kdbx", { password: "secret" }, "python3", new URL("file:///tmp/bridge.py"));
test("save clears the dirty flag without throwing", async () => {
const db = new KeePassDatabase("db.kdbx", { password: "secret" });
await db.createEntry({ title: "Entry" });
await expect(db.save()).resolves.toBeUndefined();
expect(spawnMock).toHaveBeenCalled();
});
test("findEntries forwards the query payload", async () => {
let payload = "";
spawnMock.mockImplementation(() => {
const child = {
stdin: {
write: (chunk: string) => {
payload += chunk;
},
end: () => undefined,
},
stdout: { on: (_event: string, cb: (chunk: Buffer | string) => void) => cb(JSON.stringify({ ok: true, data: [] })) },
stderr: { on: () => undefined },
on: (event: string, cb: (code?: number | null) => void) => {
if (event === "close") queueMicrotask(() => cb(0));
},
};
return child as never;
});
test("entry and group collections are cloned on read", async () => {
const db = new KeePassDatabase("db.kdbx", { password: "secret" });
await db.createEntry({ title: "Entry" });
await db.createGroup({ name: "Folder1" });
const db = new KeePassDatabase("db.kdbx", { password: "secret" }, "python3", new URL("file:///tmp/bridge.py"));
await db.findEntries({ title: "abc", username: "u", url: "https://x", groupPath: "Folder" });
const entries = await db.listEntries();
const groups = await db.listGroups();
entries.push({ title: "X", username: "", password: "", url: "", notes: "" });
groups.push({ name: "X", path: "X" });
expect(JSON.parse(payload)).toMatchObject({
command: "find-entries",
path: "db.kdbx",
password: "secret",
query: { title: "abc", username: "u", url: "https://x", groupPath: "Folder" },
});
});
test("listGroups forwards the list-groups command", async () => {
let payload = "";
spawnMock.mockImplementation(() => {
const child = {
stdin: {
write: (chunk: string) => {
payload += chunk;
},
end: () => undefined,
},
stdout: { on: (_event: string, cb: (chunk: Buffer | string) => void) => cb(JSON.stringify({ ok: true, data: [] })) },
stderr: { on: () => undefined },
on: (event: string, cb: (code?: number | null) => void) => {
if (event === "close") queueMicrotask(() => cb(0));
},
};
return child as never;
});
const db = new KeePassDatabase("db.kdbx", { password: "secret" }, "python3", new URL("file:///tmp/bridge.py"));
await db.listGroups();
expect(JSON.parse(payload)).toMatchObject({
command: "list-groups",
path: "db.kdbx",
password: "secret",
});
});
test("passes keyFile in the bridge payload", async () => {
let payload = "";
spawnMock.mockImplementation(() => {
const child = {
stdin: {
write: (chunk: string) => {
payload += chunk;
},
end: () => undefined,
},
stdout: { on: (_event: string, cb: (chunk: Buffer | string) => void) => cb(JSON.stringify({ ok: true, data: [] })) },
stderr: { on: () => undefined },
on: (event: string, cb: (code?: number | null) => void) => {
if (event === "close") queueMicrotask(() => cb(0));
},
};
return child as never;
});
const db = new KeePassDatabase("db.kdbx", { password: "secret", keyFile: "keyfile.key" }, "python3", new URL("file:///tmp/bridge.py"));
await db.listEntries();
expect(JSON.parse(payload)).toMatchObject({
path: "db.kdbx",
password: "secret",
keyFile: "keyfile.key",
command: "list-entries",
});
expect(await db.listEntries()).toHaveLength(1);
expect(await db.listGroups()).toHaveLength(2);
});
test("openKeePassDatabase returns a KeePassDatabase instance", () => {