Coverage for src/ragindexer/DocumentIndexer.py: 56%

116 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-20 15:57 +0000

1import os 

2import time 

3import threading 

4from pathlib import Path 

5from typing import Iterable, List, Tuple 

6 

7from watchdog.observers import Observer 

8from watchdog.events import FileSystemEventHandler, FileSystemEvent 

9 

10from sentence_transformers import SentenceTransformer 

11from qdrant_client.models import ( 

12 Filter, 

13 FieldCondition, 

14 MatchValue, 

15) 

16 

17from .documents.DocumentFactory import DocumentFactory 

18from . import logger 

19from .index_database import ( 

20 delete_stored_file, 

21 get_stored_timestamp, 

22 set_stored_timestamp, 

23 list_stored_files, 

24) 

25from .config import config 

26from .QdrantIndexer import QdrantIndexer 

27from .models import ChunkType, EmbeddingType 

28 

29 

30class DocumentIndexer: 

31 """ 

32 Object that reacts to filesystem events (document creation/modification/deletion) 

33 and updates the databases 

34 

35 """ 

36 

37 def __init__(self): 

38 # Load embedding model 

39 self.model = SentenceTransformer( 

40 config.EMBEDDING_MODEL, 

41 trust_remote_code=config.EMBEDDING_MODEL_TRUST_REMOTE_CODE, 

42 backend="torch", 

43 cache_folder=config.STATE_DB_PATH.parent / "models", 

44 ) 

45 self.vector_size = self.model.get_sentence_embedding_dimension() 

46 self.doc_factory = DocumentFactory() 

47 self.doc_factory.set_embedding_model(self.model) 

48 

49 # Initialize Qdrant 

50 self.qdrant = QdrantIndexer(vector_size=self.vector_size) 

51 

52 # Lock around state & indexing operations 

53 self.lock = threading.Lock() 

54 

55 def extract_text( 

56 self, abspath: Path 

57 ) -> Iterable[Tuple[int, List[ChunkType], List[EmbeddingType], dict]]: 

58 """Extract chunks, embeddings and metadata from file path 

59 

60 Args: 

61 abspath: Path to a file to analyse 

62 

63 Yields: 

64 A tuple with a list of chunks, the corresponding list of embeddings, and the file metadata 

65 

66 """ 

67 for k_page, chunks, embeddings, file_metadata in self.doc_factory.processDocument(abspath): 

68 yield k_page, chunks, embeddings, file_metadata 

69 

70 def process_file(self, filepath: Path, force: bool = False): 

71 """ 

72 Extract text, chunk, embed, and upsert into Qdrant. 

73 

74 Args: 

75 filepath: Path to the file to be analyzed 

76 force: True to process the file even if the database says that it has already been processed 

77 

78 """ 

79 stat = os.path.getmtime(filepath) 

80 stored = get_stored_timestamp(filepath) 

81 if (stored is not None and stored == stat) and not force: 81 ↛ 83line 81 didn't jump to line 83 because the condition on line 81 was never true

82 # No change 

83 return 

84 

85 logger.info(72 * "=") 

86 logger.info(f"[INDEX] Processing changed file: '{filepath}'") 

87 nb_emb = 0 

88 for k_page, chunks, embeddings, file_metadata in self.extract_text(filepath): 

89 # Upsert into Qdrant 

90 self.qdrant.record_embeddings(k_page, chunks, embeddings, file_metadata) 

91 nb_emb += len(embeddings) 

92 

93 # Update state DB 

94 set_stored_timestamp(filepath, stat) 

95 logger.info(f"[INDEX] Upserted {nb_emb} vectors") 

96 

97 def remove_file(self, filepath: Path): 

98 """ 

99 Delete all vectors whose payload.source == this file's absolute path. 

100 We identify by regenerating all chunk IDs for old state—but since we store 

101 last-modified in SQLite, we know it existed before; we'll iterate over state DB 

102 to remove associated chunk IDs. Simpler: query by payload.source in Qdrant. 

103 

104 Args: 

105 filepath: Path to the file to be analyzed, relative to DOCS_PATH 

106 

107 """ 

108 logger.info(f"[DELETE] Removing file from index: '{filepath}'") 

109 

110 # Query Qdrant for all points with payload.source == abspath 

111 # filter_ = {"must": [{"key": "source", "match": {"value": abspath}}]} 

112 filter_ = Filter(must=[FieldCondition(key="source", match=MatchValue(value=str(filepath)))]) 

113 

114 # Retrieve IDs matching that filter 

115 hits = self.qdrant.search(limit=1000, query_filter=filter_) 

116 

117 ids_to_delete = [hit.id for hit in hits] 

118 if ids_to_delete: 

119 self.qdrant.delete(ids_to_delete) 

120 logger.info(f"[DELETE] Removed {len(ids_to_delete)} vectors") 

121 

122 # Remove from state DB 

123 delete_stored_file(filepath) 

124 

125 def initial_scan(self) -> int: 

126 """ 

127 On startup, walk entire DOCS_PATH and index any new/modified files. 

128 Also, find any entries in state DB that no longer exist on disk, and remove them. 

129 """ 

130 logger.info("Performing initial scan of documents folder...") 

131 

132 # 1. Build a set of all file paths on disk 

133 disk_files: list[Path] = [] 

134 for ext in ("*.pdf", "*.docx", "*.xlsx", "*.xlsm", "*.md", "*.txt"): 

135 disk_files.extend(config.DOCS_PATH.rglob(ext)) 

136 for ext in ("*.pdf", "*.docx", "*.xlsx", "*.xlsm", "*.md", "*.txt"): 

137 disk_files.extend(config.EMAILS_PATH.rglob(ext)) 

138 disk_files = [p.resolve() for p in disk_files] 

139 

140 # 2. For each file on disk, check timestamp vs. state DB 

141 files_to_index = [] 

142 for file_path in disk_files: 

143 stored = get_stored_timestamp(file_path) 

144 modified = os.path.getmtime(str(file_path)) 

145 if stored is None or stored != modified: 145 ↛ 142line 145 didn't jump to line 142 because the condition on line 145 was always true

146 files_to_index.append(file_path) 

147 

148 # 3. For each modified file on disk, process its chunks 

149 tot_nb_files = len(files_to_index) 

150 for n_file, file_path in enumerate(files_to_index): 

151 logger.info(f"Initial indexation of {n_file}/{tot_nb_files} - '{file_path}'") 

152 stored = get_stored_timestamp(file_path) 

153 modified = os.path.getmtime(str(file_path)) 

154 self.process_file(file_path) 

155 

156 # 3. For each file in state DB, if not on disk anymore, delete from Qdrant 

157 for relpath in list_stored_files(): 

158 abspath = config.DOCS_PATH / relpath 

159 if not abspath.exists(): 159 ↛ 161line 159 didn't jump to line 161 because the condition on line 159 was never true

160 # Remove from Qdrant 

161 self.remove_file(relpath) 

162 

163 return tot_nb_files 

164 

165 def __on_created_or_modified(self, event: FileSystemEvent): 

166 if event.is_directory: 

167 return 

168 

169 filepath = Path(event.src_path) 

170 if not self.doc_factory.filter_file(filepath): 

171 return 

172 

173 with self.lock: 

174 # Small delay to allow file write to finish 

175 time.sleep(0.5) 

176 self.process_file(filepath) 

177 

178 def __on_deleted(self, event: FileSystemEvent): 

179 if event.is_directory: 

180 return 

181 

182 filepath = Path(event.src_path) 

183 if not self.doc_factory.filter_file(filepath): 

184 return 

185 

186 with self.lock: 

187 self.remove_file(filepath) 

188 

189 def __on_moved(self, event: FileSystemEvent): 

190 # TODO Implement folder and file renaming 

191 if event.is_directory: 

192 return 

193 

194 srcpath = Path(event.src_path) 

195 destpath = Path(event.dest_path) 

196 if srcpath.suffix in (".pdf", ".docx", ".xlsx", ".xlsm", ".md", ".txt"): 

197 with self.lock: 

198 time.sleep(0.5) 

199 self.remove_file(srcpath) 

200 self.process_file(destpath) 

201 

202 def start_watcher(self): 

203 """ 

204 Launch the filesystem monitoring as a non blocking thread 

205 

206 """ 

207 event_handler = FileSystemEventHandler() 

208 event_handler.on_created = self.__on_created_or_modified 

209 event_handler.on_modified = self.__on_created_or_modified 

210 event_handler.on_moved = self.__on_moved 

211 event_handler.on_deleted = self.__on_deleted 

212 

213 # Files observer 

214 self.__docs_observer = Observer() 

215 self.__docs_observer.schedule(event_handler, path=str(config.DOCS_PATH), recursive=True) 

216 self.__docs_observer.start() 

217 

218 logger.info(f"Started file watcher on: '{config.DOCS_PATH}'") 

219 

220 # Emails observer 

221 self.__emails_observer = Observer() 

222 self.__emails_observer.schedule(event_handler, path=str(config.EMAILS_PATH), recursive=True) 

223 self.__emails_observer.start() 

224 

225 logger.info(f"Started emails watcher on: '{config.EMAILS_PATH}'") 

226 

227 self.__docs_observer.join() 

228 self.__emails_observer.join()