00001
00002
00003
00004
00005
00006
00007
00008 #include "config.h"
00009
00010 #ifndef lint
00011 static const char revid[] = "$Id: qam__open_8c-source.html,v 1.1 2008/06/08 10:21:45 sebdiaz Exp $";
00012 #endif
00013
00014 #ifndef NO_SYSTEM_INCLUDES
00015 #include <sys/types.h>
00016
00017 #include <errno.h>
00018 #include <string.h>
00019 #endif
00020
00021 #include "db_int.h"
00022 #include "db_page.h"
00023 #include "db_shash.h"
00024 #include "db_swap.h"
00025 #include "db_am.h"
00026 #include "lock.h"
00027 #include "qam.h"
00028
00029
00030
00031
00032
00033
00034 int
00035 CDB___qam_open(dbp, name, base_pgno, flags)
00036 DB *dbp;
00037 const char *name;
00038 db_pgno_t base_pgno;
00039 u_int32_t flags;
00040 {
00041 QUEUE *t;
00042 DBC *dbc;
00043 DB_LOCK metalock;
00044 DB_LSN orig_lsn;
00045 QMETA *qmeta;
00046 int locked;
00047 int ret, t_ret;
00048
00049 ret = 0;
00050 locked = 0;
00051 t = dbp->q_internal;
00052
00053
00054 dbp->del = CDB___qam_delete;
00055 dbp->put = CDB___qam_put;
00056 dbp->stat = CDB___qam_stat;
00057
00058 metalock.off = LOCK_INVALID;
00059
00060
00061
00062
00063
00064
00065
00066 if ((ret = dbp->cursor(dbp, dbp->open_txn,
00067 &dbc, LF_ISSET(DB_CREATE) && LOCKING(dbp->dbenv) ?
00068 DB_WRITECURSOR : 0)) != 0)
00069 return (ret);
00070
00071
00072 if ((ret =
00073 CDB___db_lget(dbc, 0, base_pgno, DB_LOCK_READ, 0, &metalock)) != 0)
00074 goto err;
00075 if ((ret = CDB_memp_fget(
00076 dbp->mpf, &base_pgno, DB_MPOOL_CREATE, (PAGE **)&qmeta)) != 0)
00077 goto err;
00078
00079
00080
00081
00082
00083
00084 again: if (qmeta->dbmeta.magic == DB_QAMMAGIC) {
00085 t->re_pad = qmeta->re_pad;
00086 t->re_len = qmeta->re_len;
00087 t->rec_page = qmeta->rec_page;
00088
00089 (void)CDB_memp_fput(dbp->mpf, (PAGE *)qmeta, 0);
00090 goto done;
00091 }
00092
00093
00094 if (LOCKING(dbp->dbenv)) {
00095 DB_ASSERT(LF_ISSET(DB_CREATE));
00096 if ((ret = CDB_lock_get(dbp->dbenv, dbc->locker, DB_LOCK_UPGRADE,
00097 &dbc->lock_dbt, DB_LOCK_WRITE, &dbc->mylock)) != 0)
00098 goto err;
00099 }
00100
00101
00102
00103
00104
00105 if (locked == 0 && STD_LOCKING(dbc)) {
00106 if ((ret = __LPUT(dbc, metalock)) != 0)
00107 goto err;
00108 if ((ret = CDB___db_lget(dbc,
00109 0, base_pgno, DB_LOCK_WRITE, 0, &metalock)) != 0)
00110 goto err;
00111 locked = 1;
00112 goto again;
00113 }
00114
00115 orig_lsn = qmeta->dbmeta.lsn;
00116 memset(qmeta, 0, sizeof(QMETA));
00117 ZERO_LSN(qmeta->dbmeta.lsn);
00118 qmeta->dbmeta.pgno = base_pgno;
00119 qmeta->dbmeta.magic = DB_QAMMAGIC;
00120 qmeta->dbmeta.version = DB_QAMVERSION;
00121 qmeta->dbmeta.pagesize = dbp->pgsize;
00122 qmeta->dbmeta.type = P_QAMMETA;
00123 qmeta->re_pad = t->re_pad;
00124 qmeta->start = 1;
00125 qmeta->re_len = t->re_len;
00126 qmeta->rec_page = CALC_QAM_RECNO_PER_PAGE(dbp);
00127 t->rec_page = qmeta->rec_page;
00128 memcpy(qmeta->dbmeta.uid, dbp->fileid, DB_FILE_ID_LEN);
00129
00130
00131 if (QAM_RECNO_PER_PAGE(dbp) < 1) {
00132 CDB___db_err(dbp->dbenv,
00133 "Record size of %lu too large for page size of %lu",
00134 (u_long)t->re_len, (u_long)dbp->pgsize);
00135 (void)CDB_memp_fput(dbp->mpf, (PAGE *)qmeta, 0);
00136 ret = EINVAL;
00137 goto err;
00138 }
00139
00140 if ((ret = CDB___db_log_page(dbp,
00141 name, &orig_lsn, base_pgno, (PAGE *)qmeta)) != 0)
00142 goto err;
00143
00144
00145 if ((ret = CDB_memp_fput(dbp->mpf, (PAGE *)qmeta, DB_MPOOL_DIRTY)) != 0)
00146 goto err;
00147 DB_TEST_RECOVERY(dbp, DB_TEST_POSTLOG, ret, name);
00148
00149
00150
00151
00152
00153
00154
00155
00156 if ((ret = CDB_memp_fsync(dbp->mpf)) == DB_INCOMPLETE) {
00157 CDB___db_err(dbp->dbenv, "Flush of metapage failed");
00158 ret = EINVAL;
00159 }
00160 DB_TEST_RECOVERY(dbp, DB_TEST_POSTSYNC, ret, name);
00161
00162 done: t->q_meta = base_pgno;
00163 t->q_root = base_pgno + 1;
00164
00165 err:
00166 DB_TEST_RECOVERY_LABEL
00167
00168 (void)__LPUT(dbc, metalock);
00169
00170 if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
00171 ret = t_ret;
00172
00173 return (ret);
00174 }
00175
00176
00177
00178
00179
00180
00181 int
00182 CDB___qam_metachk(dbp, name, qmeta)
00183 DB *dbp;
00184 const char *name;
00185 QMETA *qmeta;
00186 {
00187 DB_ENV *dbenv;
00188 u_int32_t vers;
00189 int ret;
00190
00191 dbenv = dbp->dbenv;
00192
00193
00194
00195
00196
00197 vers = qmeta->dbmeta.version;
00198 if (F_ISSET(dbp, DB_AM_SWAP))
00199 M_32_SWAP(vers);
00200 switch (vers) {
00201 case 1:
00202 CDB___db_err(dbenv,
00203 "%s: queue version %lu requires a version upgrade",
00204 name, (u_long)vers);
00205 return (DB_OLD_VERSION);
00206 case 2:
00207 break;
00208 default:
00209 CDB___db_err(dbenv,
00210 "%s: unsupported qam version: %lu", name, (u_long)vers);
00211 return (EINVAL);
00212 }
00213
00214
00215 if (F_ISSET(dbp, DB_AM_SWAP) && (ret = CDB___qam_mswap((PAGE *)qmeta)) != 0)
00216 return (ret);
00217
00218
00219 if (dbp->type != DB_QUEUE && dbp->type != DB_UNKNOWN)
00220 return (EINVAL);
00221 dbp->type = DB_QUEUE;
00222 DB_ILLEGAL_METHOD(dbp, DB_OK_QUEUE);
00223
00224
00225 dbp->pgsize = qmeta->dbmeta.pagesize;
00226
00227
00228 memcpy(dbp->fileid, qmeta->dbmeta.uid, DB_FILE_ID_LEN);
00229
00230 return (0);
00231 }