fix: parallel execution

This commit is contained in:
Alex 2025-05-06 18:03:51 +03:00
parent 47f21aecdc
commit 84ff2837eb
19 changed files with 56 additions and 151 deletions

View File

@ -27,7 +27,7 @@ export const addWord = cwait(async (...[word]: AddWordArgs) => {
await storeOp(constructClaim(createWordKey(id), { word }, '0x0'));
console.log('await extraLogic');
const wordClaim = await extraLogic(id);
const [wordClaim] = await Promise.all([extraLogic(id, 1), extraLogic(id, 2)]);
const newWord1 = (wordClaim?.body.word ?? '') + '_1';
const newId1 = hashCode(newWord1);

View File

@ -5,14 +5,18 @@ import { createWordKey, WordClaimBody } from '../offchain';
import { extraLogic2 } from './extraLogic2';
export const extraLogic = cwait(async (id: string) => {
console.log('extraLogic START');
export const extraLogic = cwait(async (id: string, i: number) => {
console.log('extraLogic START + ', i);
const result = await readOp<TypedClaim<WordClaimBody>>(createWordKey(id));
await storeOp(
constructClaim(createWordKey(id), { word: result?.body.word.split('').reverse().join('') + '_extraLogic' }, '0x0')
constructClaim(
createWordKey(id),
{ word: result?.body.word.split('').reverse().join('') + '_extraLogic + ' + i },
'0x0'
)
);
console.log('extraLogic return extraLogic2');
return extraLogic2(id);
console.log('extraLogic return extraLogic2 + ', i);
return extraLogic2(id, i);
});

View File

@ -2,12 +2,12 @@ import { cwait, readOp, TypedClaim } from 'cwait';
import { createWordKey, WordClaimBody } from '../offchain';
export const extraLogic2 = cwait(async (id: string) => {
console.log('extraLogic2 START');
export const extraLogic2 = cwait(async (id: string, i: number) => {
console.log('extraLogic2 START + ', i);
console.log('await readOp');
console.log('await extraLogic2 readOp + ', i);
const result = await readOp<TypedClaim<WordClaimBody>>(createWordKey(id));
console.log('extraLogic2 END');
console.log('extraLogic2 END + ', i);
return result;
});

View File

@ -93,7 +93,6 @@ export const handleContext = (ctx: Context) => {
console.log('Available Cweb: ', availableCweb);
console.log('Stored Cweb: ', storedCweb);
console.log('Take Ops: ', fundsTakeOps);
initialContext.funds = {
availableCweb: availableCweb,

View File

@ -114,9 +114,6 @@ export const extractOps = ({
const execOpResultClaim = extractRead(nextAfterBlock)?.[0]?.content;
console.log('nextAfterBlock >>>', JSON.stringify(nextAfterBlock));
console.log('execOpResultClaim >>>', JSON.stringify(execOpResultClaim));
if (!execOpResultClaim) {
throw new Error('Wrong mutex exec result claim');
}
@ -151,8 +148,6 @@ export const extractOps = ({
const totalOpsCount = extractedOps.length + executedOps.length;
for (let i = 0; i < totalOpsCount; i++) {
console.log('i >>>', i);
console.log('execOpsIndexes.includes(i) >>>', execOpsIndexes.includes(i));
if (execOpsIndexes.includes(i)) {
const op = executedOps.shift();
@ -160,8 +155,6 @@ export const extractOps = ({
throw new Error('Wrong mutex exec result place');
}
console.log('op >>>', JSON.stringify(op));
if (op.ok) {
allOps.push(op.resolved);
} else {

View File

@ -1,11 +1,14 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { Context, NewTx, getMethodArguments, isSelfCall } from '@coinweb/contract-kit';
import { Task } from '../types';
import { context, getRawContext, handleContext } from './context';
import { getAwaitedTasks } from './promisifiedFeatures';
import { pushResolvedOp } from './promisifiedFeatures/runtime/resolvedOps';
import { constructTx } from './utils';
let abortExecution: ((result: boolean) => void) | null = null;
let abortExecution: ((result: { isFullyExecuted: boolean; awaitedTasks: Task[] }) => void) | null = null;
export const executor = (method: (...args: any[]) => Promise<void>) => {
return async (ctx: Context): Promise<NewTx[]> => {
@ -18,16 +21,19 @@ export const executor = (method: (...args: any[]) => Promise<void>) => {
}
if (shouldRestart) {
return constructTx(false);
const awaitedTasks = getAwaitedTasks();
return constructTx(awaitedTasks, false);
}
const execution = new Promise<boolean>((resolve, reject) => {
const execution = new Promise<{ isFullyExecuted: boolean; awaitedTasks: Task[] }>((resolve, reject) => {
abortExecution = resolve;
method(...context.initialArgs).then(
() => {
console.log('<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< executor-resolved');
resolve(true);
const awaitedTasks = getAwaitedTasks();
resolve({ isFullyExecuted: true, awaitedTasks });
},
(error) => {
console.log(' <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<executor-rejected');
@ -40,12 +46,14 @@ export const executor = (method: (...args: any[]) => Promise<void>) => {
//@ts-expect-error
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
os.setTimeout(() => {
abortExecution?.(false);
const awaitedTasks = getAwaitedTasks();
abortExecution?.({ isFullyExecuted: false, awaitedTasks });
}, 0);
try {
const isFullyExecuted = await execution;
return constructTx(isFullyExecuted);
const { isFullyExecuted, awaitedTasks } = await execution;
return constructTx(awaitedTasks, isFullyExecuted);
} catch (error) {
console.log('<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< executor-error');
console.log((error as Error).message);
@ -56,5 +64,6 @@ export const executor = (method: (...args: any[]) => Promise<void>) => {
export const stopExecution = () => {
console.log('stopExecution');
abortExecution?.(false);
const awaitedTasks = getAwaitedTasks();
abortExecution?.({ isFullyExecuted: false, awaitedTasks });
};

View File

@ -1,37 +1,29 @@
import { constructStore } from '@coinweb/contract-kit/dist/esm/operations/store';
import { constructResultClaim } from '../../../claims/result';
import { context } from '../../../context';
import { stopExecution } from '../../../executor';
import { opMarker } from '../../../global';
import { isResolvedChildOp, isResolvedExecOp, isResolvedSlotOp } from '../../../utils';
import { uuid } from '../../../utils';
import { getAwaitedTasks, pushAwaitedTask } from '../../runtime/awaitedTasks';
import { getAwaitedTasksCount, pushAwaitedTask } from '../../runtime/awaitedTasks';
import { getUsedOps, saveUsedOps, shiftResolvedOp } from '../../runtime/resolvedOps';
let isRootDetected = false;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export const cwait = <TAsyncCallback extends (...args: any[]) => Promise<unknown>>(asyncCallback: TAsyncCallback) => {
console.log('cwait: ', asyncCallback.name);
let isRoot = false;
return (async (...args: Parameters<TAsyncCallback>) => {
console.log('cwait callback');
if (!isRootDetected) {
isRootDetected = true;
isRoot = true;
}
console.log('isRoot:', isRoot);
console.log('isChild:', context.isChild);
if (isRoot) {
return asyncCallback(...args);
}
console.log('child logic');
const { op, isOp } = shiftResolvedOp();
if (!isOp) {
@ -48,33 +40,27 @@ export const cwait = <TAsyncCallback extends (...args: any[]) => Promise<unknown
return result as ReturnType<TAsyncCallback>;
} else {
if (isResolvedSlotOp(op)) {
console.log('cwait-slotOp');
return new Promise(() => null);
}
if (isResolvedExecOp(op)) {
console.log('cwait-execOp');
saveUsedOps();
const result = await asyncCallback(...args);
asyncCallback(...args);
const awaitedOps = getAwaitedTasks();
if (!awaitedOps.length) {
if (!getAwaitedTasksCount()) {
pushAwaitedTask(constructStore(constructResultClaim(op.ExecOp.id, getUsedOps())));
stopExecution();
}
return result;
stopExecution();
return new Promise(() => null);
}
if (isResolvedChildOp(op)) {
return asyncCallback(...args);
}
console.log('cwait-error');
throw new Error('Read operation not found');
}
}) as TAsyncCallback;

View File

@ -5,7 +5,6 @@ import { isResolvedBlockOp, isResolvedSlotOp } from '../../../utils';
import { pushAwaitedTask, shiftResolvedOp } from '../../runtime';
export const blockOp = (filters: BlockFilter[]) => {
console.log('blockOp');
let opMarkerValue = false;
const result = new Promise<[BlockFilter, boolean][] | null>((resolve, reject) => {
@ -17,12 +16,10 @@ export const blockOp = (filters: BlockFilter[]) => {
opMarkerValue = true;
} else {
if (isResolvedSlotOp(op)) {
console.log('blockOp-slotOp');
return;
}
if (!isResolvedBlockOp(op)) {
console.log('blockOp-error');
throw new Error('Block operation not found');
}

View File

@ -13,7 +13,6 @@ export const rangeReadOp = <TClaims extends Claim[] = TypedClaim[]>(
maxCount: number
) => {
let opMarkerValue = false;
console.log('rangeReadOp');
const result = new Promise<TClaims | null>((resolve, reject) => {
try {
@ -24,12 +23,10 @@ export const rangeReadOp = <TClaims extends Claim[] = TypedClaim[]>(
opMarkerValue = true;
} else {
if (isResolvedSlotOp(op)) {
console.log('rangeReadOp-slotOp');
return;
}
if (!isResolvedReadOp(op)) {
console.log('rangeReadOp-error');
throw new Error('Read operation not found');
}

View File

@ -7,7 +7,6 @@ import { isResolvedReadOp, isResolvedSlotOp } from '../../../utils';
import { pushAwaitedTask, shiftResolvedOp } from '../../runtime';
export const readOp = <TClaim extends Claim = TypedClaim>(key: ClaimKey) => {
console.log('readOp');
let opMarkerValue = false;
const result = new Promise<TClaim | null>((resolve, reject) => {
@ -19,12 +18,10 @@ export const readOp = <TClaim extends Claim = TypedClaim>(key: ClaimKey) => {
opMarkerValue = true;
} else {
if (isResolvedSlotOp(op)) {
console.log('readOp-slotOp');
return;
}
if (!isResolvedReadOp(op)) {
console.log('readOp-error');
throw new Error('Read operation not found');
}

View File

@ -6,7 +6,6 @@ import { isResolvedSlotOp, isResolvedStoreOp } from '../../../utils';
import { pushAwaitedTask, shiftResolvedOp } from '../../runtime';
export const storeOp = (claim: Claim, storeCweb?: bigint) => {
console.log('storeOp');
let opMarkerValue = false;
const result = new Promise<Claim | null>((resolve, reject) => {
@ -18,12 +17,10 @@ export const storeOp = (claim: Claim, storeCweb?: bigint) => {
opMarkerValue = true;
} else {
if (isResolvedSlotOp(op)) {
console.log('storeOp-slotOp');
return;
}
if (!isResolvedStoreOp(op)) {
console.log('storeOp-error');
throw new Error('Store operation not found');
}

View File

@ -6,7 +6,6 @@ import { isResolvedSlotOp, isResolvedTakeOp } from '../../../utils';
import { pushAwaitedTask, shiftResolvedOp } from '../../runtime';
export const takeOp = <TClaim extends Claim = TypedClaim>(key: ClaimKey) => {
console.log('takeOp');
let opMarkerValue = false;
const result = new Promise<TClaim | null>((resolve, reject) => {
@ -18,12 +17,10 @@ export const takeOp = <TClaim extends Claim = TypedClaim>(key: ClaimKey) => {
opMarkerValue = true;
} else {
if (isResolvedSlotOp(op)) {
console.log('takeOp-slotOp');
return;
}
if (!isResolvedTakeOp(op)) {
console.log('takeOp-error');
throw new Error('Take operation not found');
}

View File

@ -6,7 +6,7 @@ export const pushAwaitedTask = (op: PreparedOp) => {
awaitedTasks.push({ op, batchId: -1 });
};
export const getAwaitedTasks = () => awaitedTasks;
export const getAwaitedTasks = () => [...awaitedTasks];
export const markTaskBatch = (count: number, batchId: number) => {
for (let i = 1; i <= count; i++) {

View File

@ -1,9 +1,11 @@
import { Task } from '../../../types';
import { prepareTx } from './prepareTxs';
export const constructTx = (isFullyExecuted: boolean) => {
const { calls, txFee } = prepareTx(isFullyExecuted, 0n);
export const constructTx = (awaitedTasks: Task[], isFullyExecuted: boolean) => {
const { calls, txFee } = prepareTx({ awaitedTasks, isFullyExecuted, txFee: 0n });
const { txs } = prepareTx(isFullyExecuted, txFee, calls);
const { txs } = prepareTx({ awaitedTasks, isFullyExecuted, txFee, callsCount: calls });
return txs;
};

View File

@ -82,7 +82,6 @@ export const prepareInThreadTxs = ({
ops.forEach((op, i) => {
switch (true) {
case isPreparedExecOp(op): {
console.log('Child call info');
const id = op.ExecOp.id;
callArgs.push(constructBlock([constructResultBlockFilter(id)]), constructResultClaimTake(id));
@ -192,8 +191,6 @@ export const prepareInThreadTxs = ({
if ('StoreOp' in latestCallArg && (latestCallArg.StoreOp.key.first_part as [string])[0] === resultKey) {
//SAVE RESULT CLAIMS
console.log('SAVE RESULT CLAIMS', JSON.stringify(callArgs));
if (callArgs.length > 1) {
throw new Error('Unexpected count of result ops');
}

View File

@ -43,8 +43,6 @@ export const prepareOutThreadTxs = ({
ops.forEach((op) => {
switch (true) {
case isPreparedExecOp(op): {
console.log('Sibling call info');
const id = op.ExecOp.id;
const callInfo = {

View File

@ -1,23 +1,23 @@
import { constructContinueTx, NewTx, passCwebFrom, sendCwebInterface } from '@coinweb/contract-kit';
import { Task } from '../../../types';
import { context, getRawContext } from '../../context';
import { getAwaitedTasks } from '../../promisifiedFeatures';
import { prepareInThreadTxs } from './prepareInThreadTxs';
import { prepareOutThreadTxs } from './prepareOutThreadTxs';
import { splitTasks } from './splitTasks';
export const prepareTx = (
isFullyExecuted: boolean,
txFee: bigint,
callsCount?: number
): { txs: NewTx[]; calls: number; txFee: bigint } => {
console.log('Calls Count: ', callsCount);
const awaitedTasks = getAwaitedTasks();
console.log('Awaited Tasks: ', JSON.stringify(awaitedTasks));
export const prepareTx = ({
awaitedTasks,
isFullyExecuted,
txFee,
callsCount,
}: {
awaitedTasks: Task[];
isFullyExecuted: boolean;
txFee: bigint;
callsCount?: number;
}): { txs: NewTx[]; calls: number; txFee: bigint } => {
if (!awaitedTasks.length) {
if (context.isChild) {
return { txs: [], calls: 0, txFee: 0n };

View File

@ -29,137 +29,69 @@ import {
} from '../../types';
export const isResolvedSlotOp = (op?: ResolvedOp | null): op is ResolvedSlotOp => {
if (op && 'SlotOp' in op) {
console.log('isResolvedSlotOp >>> ', JSON.stringify(op));
}
return !!(op && 'SlotOp' in op);
};
export const isResolvedExecOp = (op?: ResolvedOp | null): op is ResolvedExecOp => {
if (op && 'ExecOp' in op) {
console.log('isResolvedExecOp >>> ', JSON.stringify(op));
}
return !!(op && 'ExecOp' in op);
};
export const isResolvedChildOp = (op?: ResolvedOp | null): op is ResolvedChildOp => {
if (op && 'ChildOp' in op) {
console.log('isResolvedChildOp >>> ', JSON.stringify(op));
}
return !!(op && 'ChildOp' in op);
};
export const isResolvedLockOp = (op?: ResolvedOp | null): op is ResolvedLockOp => {
if (op && 'LockOp' in op) {
console.log('isResolvedLockOp >>> ', JSON.stringify(op));
}
return !!(op && 'LockOp' in op);
};
export const isResolvedUnlockOp = (op?: ResolvedOp | null): op is ResolvedUnlockOp => {
if (op && 'UnlockOp' in op) {
console.log('isResolvedUnlockOp >>> ', JSON.stringify(op));
}
return !!(op && 'UnlockOp' in op);
};
export const isResolvedBlockOp = (op?: ResolvedOp | null): op is GBlock<ResolvedBlock> => {
if (op && 'BlockOp' in op) {
console.log('isResolvedBlockOp >>> ', JSON.stringify(op));
}
return isResolvedBlock(op as ResolvedOperation); //TODO: Fix contract-kit types
};
export const isResolvedStoreOp = (op?: ResolvedOp | null): op is GStore<CwebStore> => {
if (op && 'StoreOp' in op) {
console.log('isResolvedStoreOp >>> ', JSON.stringify(op));
}
return isResolvedStore(op as ResolvedOperation); //TODO: Fix contract-kit types
};
export const isResolvedCallOp = (op?: ResolvedOp | null): op is GCall<CwebCallRefResolved> => {
if (op && 'CallOp' in op) {
console.log('isResolvedCallOp >>> ', JSON.stringify(op));
}
return isResolvedCall(op as ResolvedOperation); //TODO: Fix contract-kit types
};
export const isResolvedTakeOp = (op?: ResolvedOp | null): op is GTake<ResolvedTake> => {
if (op && 'TakeOp' in op) {
console.log('isResolvedTakeOp >>> ', JSON.stringify(op));
}
return isResolvedTake(op as ResolvedOperation); //TODO: Fix contract-kit types
};
export const isResolvedReadOp = (op?: ResolvedOp | null): op is GRead<ResolvedRead> => {
if (op && 'ReadOp' in op) {
console.log('isResolvedReadOp >>> ', JSON.stringify(op));
}
return isResolvedRead(op as ResolvedOperation); //TODO: Fix contract-kit types
};
export const isPreparedExecOp = (op?: PreparedOp | null): op is PreparedExecOp => {
if (op && 'ExecOp' in op) {
console.log('isPreparedExecOp >>> ', JSON.stringify(op));
}
return !!(op && 'ExecOp' in op);
};
export const isPreparedBlockOp = (op?: PreparedOp | null): op is GBlock<CwebBlock> => {
if (op && 'BlockOp' in op) {
console.log('isPreparedBlockOp >>> ', JSON.stringify(op));
}
return !!(op && 'BlockOp' in op);
};
export const isPreparedLockOp = (op?: PreparedOp | null): op is PreparedLockOp => {
if (op && 'LockOp' in op) {
console.log('isPreparedLockOp >>> ', JSON.stringify(op));
}
return !!(op && 'LockOp' in op);
};
export const isPreparedUnlockOp = (op?: PreparedOp | null): op is PreparedUnlockOp => {
if (op && 'UnlockOp' in op) {
console.log('isPreparedUnlockOp >>> ', JSON.stringify(op));
}
return !!(op && 'UnlockOp' in op);
};
export const isPreparedStoreOp = (op?: PreparedOp | null): op is GStore<CwebStore> => {
if (op && 'StoreOp' in op) {
console.log('isPreparedStoreOp >>> ', JSON.stringify(op));
}
return !!(op && 'StoreOp' in op);
};
export const isPreparedExtendedStoreOp = (op?: PreparedOp | null): op is PreparedExtendedStoreOp => {
if (op && 'StoreOp' in op) {
console.log('isPreparedExtendedStoreOp >>> ', JSON.stringify(op));
}
return !!(op && 'StoreOp' in op);
};
export const isPreparedTakeOp = (op?: PreparedOp | null): op is GTake<CwebTake> => {
if (op && 'TakeOp' in op) {
console.log('isPreparedTakeOp >>> ', JSON.stringify(op));
}
return !!(op && 'TakeOp' in op);
};

View File

@ -1,4 +1,4 @@
VITE_API_URL='https://api-cloud.coinweb.io/wallet'
VITE_EXPLORER_URL='https://explorer.coinweb.io'
VITE_CONTRACT_ADDRESS="0x6195433d58b3415f2e4fe909c770a9ecc1c2f15ee59585daa06820a502fb9812"
VITE_CONTRACT_ADDRESS="0x5b7c1e53461f5497d467cb9d5c772be5e8c1a61b60e7f2f4ce71c7249d8e768a"