Initial commit

This commit is contained in:
2026-05-09 23:50:24 +02:00
commit 0d25e52ebc
21 changed files with 867 additions and 0 deletions
+15
View File
@@ -0,0 +1,15 @@
import { readFile } from "node:fs/promises";
import { openKeePassDatabase } from "./keepass";
async function main() {
const { password } = JSON.parse(await readFile("tests/fixtures/data.kdbx.json", "utf8")) as { password: string };
const db = openKeePassDatabase("tests/fixtures/data.kdbx", { password });
const entries = await db.listEntries();
console.log(entries);
}
main().catch((error) => {
console.error(error);
process.exit(1);
});
+9
View File
@@ -0,0 +1,9 @@
export { KeePassDatabase, openKeePassDatabase } from "./keepass";
export type {
KeePassCommand,
KeePassEntry,
KeePassFindQuery,
KeePassGroup,
KeePassOpenOptions,
KeePassResponse,
} from "./types";
+110
View File
@@ -0,0 +1,110 @@
import { spawn } from "node:child_process";
import { fileURLToPath } from "node:url";
import type {
KeePassCommand,
KeePassEntry,
KeePassFindQuery,
KeePassGroup,
KeePassOpenOptions,
KeePassResponse,
} from "./types";
export class KeePassDatabase {
constructor(
private readonly path: string,
private readonly options: KeePassOpenOptions,
private readonly pythonPath = "python3",
private readonly bridgePath = new URL("./python/bridge.py", import.meta.url)
) {}
async listEntries(): Promise<KeePassEntry[]> {
const response = await this.run<KeePassEntry[]>({ command: "list-entries" });
return response;
}
async findEntries(query: KeePassFindQuery): Promise<KeePassEntry[]> {
const response = await this.run<KeePassEntry[]>({ command: "find-entries", query });
return response;
}
async listGroups(): Promise<KeePassGroup[]> {
const response = await this.run<KeePassGroup[]>({ command: "list-groups" });
return response;
}
async close(): Promise<void> {
// No persistent process is kept alive yet.
return;
}
private async run<T>(command: KeePassCommand): Promise<T> {
const payload = JSON.stringify({
...command,
path: this.path,
password: this.options.password,
keyFile: this.options.keyFile,
});
const bridgeFile = fileURLToPath(this.bridgePath);
const result = await new Promise<{ stdout: string; stderr: string; code: number }>((resolve, reject) => {
const child = spawn(this.pythonPath, [bridgeFile], {
stdio: ["pipe", "pipe", "pipe"],
});
let stdout = "";
let stderr = "";
let settled = false;
try {
child.stdin.write(payload);
child.stdin.end();
} catch (error) {
reject(error);
}
child.stdout.on("data", (chunk) => {
stdout += chunk.toString();
});
child.stderr.on("data", (chunk) => {
stderr += chunk.toString();
});
child.on("error", (error) => {
if (!settled) {
settled = true;
reject(error);
}
});
child.on("close", (code) => {
if (!settled) {
settled = true;
resolve({ stdout, stderr, code: code ?? 1 });
}
});
});
const output = result.stdout.trim();
if (!output) {
throw new Error(result.stderr || `Empty response from Python bridge (exit code ${result.code})`);
}
let parsed: KeePassResponse<T>;
try {
parsed = JSON.parse(output) as KeePassResponse<T>;
} catch (error) {
throw new Error(`Invalid JSON from Python bridge: ${error instanceof Error ? error.message : String(error)}`);
}
if (!parsed.ok) {
throw new Error(parsed.error || result.stderr || `KeePass bridge error (exit code ${result.code})`);
}
return parsed.data;
}
}
export function openKeePassDatabase(path: string, options: KeePassOpenOptions) {
return new KeePassDatabase(path, options);
}
+135
View File
@@ -0,0 +1,135 @@
#!/usr/bin/env python3
import json
import sys
from pathlib import Path
try:
from pykeepass import PyKeePass
except Exception as exc:
print(json.dumps({"ok": False, "error": f"Failed to import pykeepass: {exc}"}))
sys.exit(1)
def path_to_string(path):
if path is None:
return ""
if isinstance(path, str):
return path
if isinstance(path, (list, tuple)):
parts = []
for segment in path:
if hasattr(segment, "name") and segment.name:
parts.append(segment.name)
elif segment:
parts.append(str(segment))
return "/".join(parts)
return str(path)
def entry_to_dict(entry):
result = {
"title": entry.title or "",
"username": entry.username or "",
"password": entry.password or "",
"url": entry.url or "",
"notes": entry.notes or "",
"groupPath": path_to_string(entry.group.path if entry.group else ""),
}
otp = getattr(entry, "otp", None)
if otp:
result["otp"] = otp
return result
def group_to_dict(group):
return {
"name": group.name or "",
"path": path_to_string(group.path),
}
def load_db(db_path, password, key_file=None):
if key_file:
return PyKeePass(db_path, password=password, keyfile=key_file)
return PyKeePass(db_path, password=password)
def read_payload():
if len(sys.argv) > 1 and sys.argv[1].strip():
return json.loads(sys.argv[1])
return json.load(sys.stdin)
def emit(payload):
print(json.dumps(payload), flush=True)
def main():
try:
payload = read_payload()
except Exception as exc:
emit({"ok": False, "error": f"Invalid input payload: {exc}"})
return 1
command = payload.get("command")
db_path = payload.get("path")
password = payload.get("password")
key_file = payload.get("keyFile")
if not db_path or not password:
emit({"ok": False, "error": "Missing path or password"})
return 1
if not Path(db_path).exists():
emit({"ok": False, "error": f"Database not found: {db_path}"})
return 1
try:
db = load_db(db_path, password, key_file)
if command == "list-entries":
entries = [entry_to_dict(entry) for entry in db.entries]
emit({"ok": True, "data": entries})
return 0
if command == "find-entries":
query = payload.get("query", {})
results = []
for entry in db.entries:
entry_group_path = path_to_string(entry.group.path if entry.group else "")
if query.get("title") and query["title"] not in (entry.title or ""):
continue
if query.get("username") and query["username"] not in (entry.username or ""):
continue
if query.get("url") and query["url"] not in (entry.url or ""):
continue
if query.get("groupPath") and query["groupPath"] not in entry_group_path:
continue
results.append(entry_to_dict(entry))
emit({"ok": True, "data": results})
return 0
if command == "list-groups":
groups = []
for group in db.groups:
groups.append(group_to_dict(group))
emit({"ok": True, "data": groups})
return 0
emit({"ok": False, "error": f"Unknown command: {command}"})
return 1
except Exception as exc:
emit({"ok": False, "error": str(exc)})
return 1
if __name__ == "__main__":
raise SystemExit(main())
+16
View File
@@ -0,0 +1,16 @@
import { readFile } from "node:fs/promises";
import { openKeePassDatabase } from "./keepass";
async function main() {
const { password } = JSON.parse(await readFile("tests/fixtures/data.kdbx.json", "utf8")) as { password: string };
const db = openKeePassDatabase("tests/fixtures/data.kdbx", { password });
const entries = await db.listEntries();
console.log(JSON.stringify({ ok: true, count: entries.length }, null, 2));
}
main().catch((error) => {
console.error(error);
process.exit(1);
});
+41
View File
@@ -0,0 +1,41 @@
export type KeePassEntry = {
title: string;
username: string;
password: string;
url: string;
notes: string;
groupPath?: string;
otp?: string;
};
export type KeePassGroup = {
name: string;
path: string;
};
export type KeePassOpenOptions = {
password: string;
keyFile?: string;
};
export type KeePassFindQuery = {
title?: string;
username?: string;
url?: string;
groupPath?: string;
};
export type KeePassCommand =
| { command: "list-entries" }
| { command: "find-entries"; query: KeePassFindQuery }
| { command: "list-groups" };
export type KeePassResponse<T> =
| {
ok: true;
data: T;
}
| {
ok: false;
error: string;
};