00001
00002
00003
00004
00005
00006
00007
00008 #ifdef HAVE_CONFIG_H
00009 #include "config.h"
00010 #endif
00011
00012 #include <stdio.h>
00013 #include <unistd.h>
00014 #include <stdlib.h>
00015 #include <sys/stat.h>
00016 #include <errno.h>
00017
00018 #include "WordKey.h"
00019 #include "WordDB.h"
00020 #include "WordDBCache.h"
00021 #include "WordMeta.h"
00022 #include "ber.h"
00023
00024 int WordDBCaches::Add(char* key, int key_size, char* data, int data_size)
00025 {
00026 int ret;
00027 if((ret = cache.Allocate(key_size + data_size)) == ENOMEM) {
00028 if((ret = CacheFlush()) != 0) return ret;
00029 if((ret = cache.Allocate(key_size + data_size))) return ret;
00030 }
00031
00032 return cache.Add(key, key_size, data, data_size);
00033 }
00034
00035 int WordDBCaches::AddFile(String& filename)
00036 {
00037 char tmp[32];
00038 unsigned int serial;
00039 words->Meta()->Serial(WORD_META_SERIAL_FILE, serial);
00040 if(serial == WORD_META_SERIAL_INVALID)
00041 return NOTOK;
00042 filename = words->Filename();
00043 sprintf(tmp, "C%08d", serial - 1);
00044 filename << tmp;
00045
00046
00047
00048
00049
00050 size = 0;
00051 {
00052 String filename;
00053 String dummy;
00054 WordDBCursor* cursor = files->Cursor();
00055 struct stat stat_buf;
00056 int i;
00057 int ret;
00058 for(i = 0; (ret = cursor->Get(filename, dummy, DB_NEXT)) == 0; i++) {
00059
00060
00061
00062
00063
00064
00065 if(stat((char*)filename, &stat_buf) == 0) {
00066 size += stat_buf.st_size;
00067 } else if(errno != ENOENT) {
00068 const String message = String("WordDBCaches::AddFile: cannot stat ") + filename;
00069 perror((const char*)message);
00070 return NOTOK;
00071 }
00072 }
00073 delete cursor;
00074 }
00075
00076 String dummy;
00077 if(files->Put(0, filename, dummy, 0) != 0)
00078 return NOTOK;
00079
00080 return OK;
00081 }
00082
00083 int WordDBCaches::CacheFlush()
00084 {
00085 if(cache.Empty()) return OK;
00086
00087 if(cache.Sort() != OK) return NOTOK;
00088 String filename;
00089 int locking = 0;
00090 if(!lock) {
00091 words->Meta()->Lock("cache", lock);
00092 locking = 1;
00093 }
00094 if(AddFile(filename) != OK) return NOTOK;
00095 if(CacheWrite(filename) != OK) return NOTOK;
00096
00097 unsigned int serial;
00098 words->Meta()->GetSerial(WORD_META_SERIAL_FILE, serial);
00099 if(serial >= (unsigned int)file_max || Full())
00100 if(Merge() != OK) return NOTOK;
00101 if(locking) words->Meta()->Unlock("cache", lock);
00102
00103 return OK;
00104 }
00105
00106 static int merge_cmp_size(WordDBCaches* , WordDBCacheFile* a, WordDBCacheFile* b)
00107 {
00108 return b->size - a->size;
00109 }
00110
00111 int WordDBCaches::Merge()
00112 {
00113 if(CacheFlush() != OK) return NOTOK;
00114
00115 int locking = 0;
00116 if(!lock) {
00117 words->Meta()->Lock("cache", lock);
00118 locking = 1;
00119 }
00120 unsigned int serial;
00121 words->Meta()->GetSerial(WORD_META_SERIAL_FILE, serial);
00122 if(serial <= 1) return OK;
00123
00124
00125
00126
00127 WordDBCacheFile* heap = new WordDBCacheFile[serial];
00128 {
00129 String filename;
00130 String dummy;
00131 WordDBCursor* cursor = files->Cursor();
00132 struct stat stat_buf;
00133 int i;
00134 int ret;
00135 for(i = 0; (ret = cursor->Get(filename, dummy, DB_NEXT)) == 0; i++) {
00136 WordDBCacheFile& file = heap[i];
00137 file.filename = filename;
00138 if(stat((char*)file.filename, &stat_buf) == 0) {
00139 file.size = stat_buf.st_size;
00140 } else {
00141 const String message = String("WordDBCaches::Merge: cannot stat ") + file.filename;
00142 perror((const char*)message);
00143 return NOTOK;
00144 }
00145 cursor->Del();
00146 }
00147 delete cursor;
00148 myqsort((void*)heap, serial, sizeof(WordDBCacheFile), (myqsort_cmp)merge_cmp_size, (void*)this);
00149 }
00150
00151 String tmpname = words->Filename() + String("C.tmp");
00152
00153 while(serial > 1) {
00154 WordDBCacheFile* a = &heap[serial - 1];
00155 WordDBCacheFile* b = &heap[serial - 2];
00156
00157 if(Merge(a->filename, b->filename, tmpname) != OK) return NOTOK;
00158
00159
00160
00161
00162 if(unlink((char*)a->filename) != 0) {
00163 const String message = String("WordDBCaches::Merge: unlink ") + a->filename;
00164 perror((const char*)message);
00165 return NOTOK;
00166 }
00167
00168
00169
00170
00171 if(unlink((char*)b->filename) != 0) {
00172 const String message = String("WordDBCaches::Merge: unlink ") + b->filename;
00173 perror((const char*)message);
00174 return NOTOK;
00175 }
00176
00177
00178
00179
00180 if(rename((char*)tmpname, (char*)b->filename) != 0) {
00181 const String message = String("WordDBCaches::Merge: rename ") + tmpname + String(" ") + b->filename;
00182 perror((const char*)message);
00183 return NOTOK;
00184 }
00185
00186
00187
00188
00189
00190 b->size += a->size;
00191
00192 serial--;
00193
00194
00195
00196 myqsort((void*)heap, serial, sizeof(WordDBCacheFile), (myqsort_cmp)merge_cmp_size, (void*)this);
00197 }
00198
00199 {
00200 String newname(words->Filename());
00201 newname << "C00000000";
00202
00203 if(rename((char*)heap[0].filename, (char*)newname) != 0) {
00204 const String message = String("WordDBCaches::Merge: rename ") + heap[0].filename + String(" ") + newname;
00205 perror((const char*)message);
00206 return NOTOK;
00207 }
00208
00209 String dummy;
00210 if(files->Put(0, newname, dummy, 0) != 0)
00211 return NOTOK;
00212 words->Meta()->SetSerial(WORD_META_SERIAL_FILE, serial);
00213 }
00214 if(locking) words->Meta()->Unlock("cache", lock);
00215
00216 return OK;
00217 }
00218
00219 int WordDBCaches::Merge(const String& filea, const String& fileb, const String& tmpname)
00220 {
00221 FILE* ftmp = fopen((const char*)tmpname, "w");
00222 FILE* fa = fopen((const char*)filea, "r");
00223 FILE* fb = fopen((const char*)fileb, "r");
00224
00225 unsigned int buffertmp_size = 128;
00226 unsigned char* buffertmp = (unsigned char*)malloc(buffertmp_size);
00227 unsigned int buffera_size = 128;
00228 unsigned char* buffera = (unsigned char*)malloc(buffera_size);
00229 unsigned int bufferb_size = 128;
00230 unsigned char* bufferb = (unsigned char*)malloc(bufferb_size);
00231
00232 unsigned int entriesa_length;
00233 if(ber_file2value(fa, entriesa_length) < 1) return NOTOK;
00234 unsigned int entriesb_length;
00235 if(ber_file2value(fb, entriesb_length) < 1) return NOTOK;
00236
00237 if(ber_value2file(ftmp, entriesa_length + entriesb_length) < 1) return NOTOK;
00238
00239 WordDBCacheEntry entrya;
00240 WordDBCacheEntry entryb;
00241
00242 if(entriesa_length > 0 && entriesb_length > 0) {
00243
00244 if(ReadEntry(fa, entrya, buffera, buffera_size) != OK) return NOTOK;
00245 if(ReadEntry(fb, entryb, bufferb, bufferb_size) != OK) return NOTOK;
00246
00247 while(entriesa_length > 0 && entriesb_length > 0) {
00248 if(WordKey::Compare(words->GetContext(), (const unsigned char*)entrya.key, entrya.key_size, (const unsigned char*)entryb.key, entryb.key_size) < 0) {
00249 if(WriteEntry(ftmp, entrya, buffertmp, buffertmp_size) != OK) return NOTOK;
00250 if(--entriesa_length > 0)
00251 if(ReadEntry(fa, entrya, buffera, buffera_size) != OK) return NOTOK;
00252 } else {
00253 if(WriteEntry(ftmp, entryb, buffertmp, buffertmp_size) != OK) return NOTOK;
00254 if(--entriesb_length > 0)
00255 if(ReadEntry(fb, entryb, bufferb, bufferb_size) != OK) return NOTOK;
00256 }
00257 }
00258 }
00259
00260 if(entriesa_length > 0 || entriesb_length > 0) {
00261 FILE* fp = entriesa_length > 0 ? fa : fb;
00262 unsigned int& entries_length = entriesa_length > 0 ? entriesa_length : entriesb_length;
00263 WordDBCacheEntry& entry = entriesa_length > 0 ? entrya : entryb;
00264 while(entries_length > 0) {
00265 if(WriteEntry(ftmp, entry, buffertmp, buffertmp_size) != OK) return NOTOK;
00266 if(--entries_length > 0)
00267 if(ReadEntry(fp, entry, buffera, buffera_size) != OK) return NOTOK;
00268 }
00269 }
00270
00271 free(buffera);
00272 free(bufferb);
00273 free(buffertmp);
00274
00275 fclose(fa);
00276 fclose(fb);
00277 fclose(ftmp);
00278
00279 return OK;
00280 }
00281
00282 int WordDBCaches::Merge(WordDB& db)
00283 {
00284 int locking = 0;
00285 if(!lock) {
00286 words->Meta()->Lock("cache", lock);
00287 locking = 1;
00288 }
00289 if(Merge() != OK) return NOTOK;
00290
00291 String filename;
00292 String dummy;
00293 WordDBCursor* cursor = files->Cursor();
00294 if(cursor->Get(filename, dummy, DB_FIRST) != 0) {
00295 delete cursor;
00296 return NOTOK;
00297 }
00298 cursor->Del();
00299 delete cursor;
00300
00301 FILE* fp = fopen((char*)filename, "r");
00302
00303 unsigned int buffer_size = 128;
00304 unsigned char* buffer = (unsigned char*)malloc(buffer_size);
00305
00306 unsigned int entries_length;
00307 if(ber_file2value(fp, entries_length) < 1) return NOTOK;
00308
00309 WordDBCacheEntry entry;
00310
00311 unsigned int i;
00312 for(i = 0; i < entries_length; i++) {
00313 if(ReadEntry(fp, entry, buffer, buffer_size) != OK) return NOTOK;
00314 void* user_data = words->GetContext();
00315 WORD_DBT_INIT(rkey, (void*)entry.key, entry.key_size);
00316 WORD_DBT_INIT(rdata, (void*)entry.data, entry.data_size);
00317 db.db->put(db.db, 0, &rkey, &rdata, 0);
00318 }
00319
00320 if(unlink((char*)filename) != 0) {
00321 const String message = String("WordDBCaches::Merge: unlink ") + filename;
00322 perror((const char*)message);
00323 return NOTOK;
00324 }
00325
00326 words->Meta()->SetSerial(WORD_META_SERIAL_FILE, 0);
00327 if(locking) words->Meta()->Unlock("cache", lock);
00328 size = 0;
00329 free(buffer);
00330 fclose(fp);
00331
00332 return OK;
00333 }
00334
00335 int WordDBCaches::CacheWrite(const String& filename)
00336 {
00337 FILE* fp = fopen(filename, "w");
00338 if(!fp) {
00339 String message;
00340 message << "WordDBCaches::CacheWrite()" << filename << "): ";
00341 perror((char*)message);
00342 return NOTOK;
00343 }
00344
00345 int entries_length;
00346 WordDBCacheEntry* entries;
00347 int ret;
00348 if((ret = cache.Entries(entries, entries_length)) != 0)
00349 return ret;
00350
00351 if(ber_value2file(fp, entries_length) < 1) return NOTOK;
00352
00353 unsigned int buffer_size = 1024;
00354 unsigned char* buffer = (unsigned char*)malloc(buffer_size);
00355 int i;
00356 for(i = 0; i < entries_length; i++) {
00357 if(WriteEntry(fp, entries[i], buffer, buffer_size) != OK) return NOTOK;
00358 }
00359 free(buffer);
00360 fclose(fp);
00361
00362 cache.Flush();
00363
00364 return OK;
00365 }
00366
00367 int WordDBCaches::WriteEntry(FILE* fp, WordDBCacheEntry& entry, unsigned char*& buffer, unsigned int& buffer_size)
00368 {
00369 if(entry.key_size + entry.data_size + 64 > buffer_size) {
00370 buffer_size = entry.key_size + entry.data_size + 64;
00371 buffer = (unsigned char*)realloc(buffer, buffer_size);
00372 }
00373
00374 int p_size = buffer_size;
00375 unsigned char* p = buffer;
00376
00377 int ber_len;
00378 if((ber_len = ber_value2buf(p, p_size, entry.key_size)) < 1) {
00379 fprintf(stderr, "WordDBCaches::WriteEntry: BER failed for key %d\n", entry.key_size);
00380 return NOTOK;
00381 }
00382 p += ber_len;
00383 memcpy(p, entry.key, entry.key_size);
00384 p += entry.key_size;
00385
00386 p_size -= ber_len + entry.key_size;
00387
00388 if((ber_len = ber_value2buf(p, p_size, entry.data_size)) < 1) {
00389 fprintf(stderr, "WordDBCaches::WriteEntry: BER failed for data %d\n", entry.data_size);
00390 return NOTOK;
00391 }
00392 p += ber_len;
00393 memcpy(p, entry.data, entry.data_size);
00394 p += entry.data_size;
00395
00396 if(fwrite((void*)buffer, p - buffer, 1, fp) != 1) {
00397 perror("WordDBCaches::WriteEntry: cannot write entry ");
00398 return NOTOK;
00399 }
00400
00401 return OK;
00402 }
00403
00404 int WordDBCaches::ReadEntry(FILE* fp, WordDBCacheEntry& entry, unsigned char*& buffer, unsigned int& buffer_size)
00405 {
00406 if(ber_file2value(fp, entry.key_size) < 1) return NOTOK;
00407
00408 if(entry.key_size > buffer_size) {
00409 buffer_size += entry.key_size;
00410 if(!(buffer = (unsigned char*)realloc(buffer, buffer_size))) return NOTOK;
00411 }
00412
00413 if(fread((void*)buffer, entry.key_size, 1, fp) != 1) {
00414 perror("WordDBCaches::ReadEntry(): cannot read key entry ");
00415 return NOTOK;
00416 }
00417
00418 if(ber_file2value(fp, entry.data_size) < 1) return NOTOK;
00419
00420 if(entry.data_size > 0) {
00421 if(entry.data_size + entry.key_size > buffer_size) {
00422 buffer_size += entry.data_size;
00423 if(!(buffer = (unsigned char*)realloc(buffer, buffer_size))) return NOTOK;
00424 }
00425
00426 if(fread((void*)(buffer + entry.key_size), entry.data_size, 1, fp) != 1) {
00427 perror("WordDBCaches::ReadEntry(): cannot read data entry ");
00428 return NOTOK;
00429 }
00430 }
00431
00432 entry.key = (char*)buffer;
00433 entry.data = (char*)(buffer + entry.key_size);
00434
00435 return OK;
00436 }